You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sj...@apache.org on 2021/11/16 21:40:11 UTC

[flink] branch master updated (44378fa -> ef5fb7a)

This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 44378fa  [FLINK-24839][fs-connector] Increase Timeout of FsStreamingSinkITCaseBase to 240
     new 443b712  [FLINK-24831][examples] Update DataStream Window examples
     new dbcce67  [FLINK-24635][examples] Fix deprecations in Twitter example
     new cfada41  [FLINK-24635][examples] Fix deprecations in state machine example
     new cce82d5  [FLINK-24635][examples] Fix deprecations in socket example
     new cf8053e  [FLINK-24635][examples] Fix deprecations in window join example
     new 1eee012  [FLINK-24635][examples] Fix deprecations in side output example
     new 0e3273d  [FLINK-24635][examples] Fix deprecations in iterations example
     new 25e0140  [FLINK-24635][examples] Fix deprecations in async example
     new 84e1a52  [hotfix][examples] Replace StreamingFileSink with FileSink
     new b00bfaa  [FLINK-24635][examples] Fix deprecations in changelog socket example
     new ef5fb7a  [FLINK-24833][examples] Prevent use of deprecated APIs in flink-examples

The 11 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-examples/flink-examples-batch/pom.xml        |  23 ++
 flink-examples/flink-examples-streaming/pom.xml    |  27 +-
 .../streaming/examples/async/AsyncClient.java      |  43 ++++
 .../streaming/examples/async/AsyncIOExample.java   | 277 +++------------------
 .../examples/async/util/SimpleSource.java          |  77 ++++++
 .../streaming/examples/gpu/MatrixVectorMul.java    |   6 +-
 .../examples/iteration/IterateExample.java         |  16 +-
 .../flink/streaming/examples/join/WindowJoin.java  |   1 -
 .../examples/sideoutput/SideOutputExample.java     |  57 +++--
 .../examples/socket/SocketWindowWordCount.java     |  30 +--
 .../examples/statemachine/StateMachineExample.java |  20 +-
 .../streaming/examples/twitter/TwitterExample.java |  18 +-
 .../GroupedProcessingTimeWindowExample.java        |  13 +-
 .../examples/windowing/SessionWindowing.java       |  20 +-
 .../examples/windowing/TopSpeedWindowing.java      | 151 ++++++-----
 .../examples/windowing/WindowWordCount.java        | 111 +++++++--
 .../examples/windowing/util/CarSource.java         |  77 ++++++
 .../scala/examples/async/AsyncClient.scala         |  31 +--
 .../scala/examples/async/AsyncIOExample.scala      |  82 +++---
 .../scala/examples/iteration/IterateExample.scala  |  19 +-
 .../streaming/scala/examples/join/WindowJoin.scala |   4 +-
 .../examples/socket/SocketWindowWordCount.scala    |   2 +-
 .../scala/examples/twitter/TwitterExample.scala    |  19 +-
 .../GroupedProcessingTimeWindowExample.scala       |   7 +-
 .../examples/windowing/SessionWindowing.scala      |  20 +-
 .../examples/windowing/TopSpeedWindowing.scala     | 133 +++++-----
 .../scala/examples/windowing/WindowWordCount.scala | 126 +++++++---
 .../scala/examples/windowing/util/CarSource.scala  |  62 +++++
 .../streaming/test/StreamingExamplesITCase.java    |   8 +-
 .../windowing/TopSpeedWindowingExampleITCase.java  |   9 +-
 .../windowing/TopSpeedWindowingExampleITCase.java  |   3 +-
 .../scala/examples/StreamingExamplesITCase.scala   |  13 +-
 flink-examples/flink-examples-table/pom.xml        |  23 ++
 .../java/connectors/ChangelogSocketExample.java    |   3 +-
 34 files changed, 943 insertions(+), 588 deletions(-)
 create mode 100644 flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncClient.java
 create mode 100644 flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/util/SimpleSource.java
 create mode 100644 flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/CarSource.java
 copy flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FinishedTaskStateProvider.java => flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncClient.scala (56%)
 create mode 100644 flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/util/CarSource.scala

[flink] 04/11: [FLINK-24635][examples] Fix deprecations in socket example

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cce82d56c311d8402b7a2cd14c6f9149a2650832
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Mon Nov 15 14:48:56 2021 -0600

    [FLINK-24635][examples] Fix deprecations in socket example
---
 .../examples/socket/SocketWindowWordCount.java     | 30 ++++++++--------------
 .../examples/socket/SocketWindowWordCount.scala    |  2 +-
 2 files changed, 12 insertions(+), 20 deletions(-)

diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
index 69d03f8..76fb37f 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java
@@ -19,13 +19,12 @@
 package org.apache.flink.streaming.examples.socket;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.util.Collector;
 
 /**
  * Implements a streaming windowed version of the "WordCount" program.
@@ -39,7 +38,6 @@ import org.apache.flink.util.Collector;
  *
  * <p>and run this example with the hostname and the port as arguments.
  */
-@SuppressWarnings("serial")
 public class SocketWindowWordCount {
 
     public static void main(String[] args) throws Exception {
@@ -71,24 +69,17 @@ public class SocketWindowWordCount {
         // parse the data, group it, window it, and aggregate the counts
         DataStream<WordWithCount> windowCounts =
                 text.flatMap(
-                                new FlatMapFunction<String, WordWithCount>() {
-                                    @Override
-                                    public void flatMap(
-                                            String value, Collector<WordWithCount> out) {
-                                        for (String word : value.split("\\s")) {
-                                            out.collect(new WordWithCount(word, 1L));
-                                        }
-                                    }
-                                })
+                                (FlatMapFunction<String, WordWithCount>)
+                                        (value, out) -> {
+                                            for (String word : value.split("\\s")) {
+                                                out.collect(new WordWithCount(word, 1L));
+                                            }
+                                        },
+                                Types.POJO(WordWithCount.class))
                         .keyBy(value -> value.word)
                         .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
-                        .reduce(
-                                new ReduceFunction<WordWithCount>() {
-                                    @Override
-                                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
-                                        return new WordWithCount(a.word, a.count + b.count);
-                                    }
-                                });
+                        .reduce((a, b) -> new WordWithCount(a.word, a.count + b.count))
+                        .returns(WordWithCount.class);
 
         // print the results with a single thread, rather than in parallel
         windowCounts.print().setParallelism(1);
@@ -104,6 +95,7 @@ public class SocketWindowWordCount {
         public String word;
         public long count;
 
+        @SuppressWarnings("unused")
         public WordWithCount() {}
 
         public WordWithCount(String word, long count) {
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
index bab6e6f..fbb98ed 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
@@ -73,7 +73,7 @@ object SocketWindowWordCount {
           .sum("count")
 
     // print the results with a single thread, rather than in parallel
-    windowCounts.print().setParallelism(1)
+    windowCounts.print()
 
     env.execute("Socket Window WordCount")
   }

[flink] 08/11: [FLINK-24635][examples] Fix deprecations in async example

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 25e0140ef7eb0c6ecd0b94f575d60a5c43552cb5
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Mon Nov 15 14:55:00 2021 -0600

    [FLINK-24635][examples] Fix deprecations in async example
---
 .../streaming/examples/async/AsyncClient.java      |  43 ++++
 .../streaming/examples/async/AsyncIOExample.java   | 277 +++------------------
 .../examples/async/util/SimpleSource.java          |  77 ++++++
 .../scala/examples/async/AsyncClient.scala         |  40 +++
 .../scala/examples/async/AsyncIOExample.scala      |  82 +++---
 5 files changed, 245 insertions(+), 274 deletions(-)

diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncClient.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncClient.java
new file mode 100644
index 0000000..3ff8d3a
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncClient.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.async;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+
+/** A simple asynchronous client that simulates interacting with an unreliable external service. */
+public class AsyncClient {
+
+    public CompletableFuture<String> query(int key) {
+        return CompletableFuture.supplyAsync(
+                () -> {
+                    long sleep = (long) (ThreadLocalRandom.current().nextFloat() * 100);
+                    try {
+                        Thread.sleep(sleep);
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException("AsyncClient was interrupted", e);
+                    }
+
+                    if (ThreadLocalRandom.current().nextFloat() < 0.001f) {
+                        throw new RuntimeException("wahahahaha...");
+                    } else {
+                        return "key" + (key % 10);
+                    }
+                });
+    }
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
index 50c220f..fd7335e 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java
@@ -17,295 +17,90 @@
 
 package org.apache.flink.streaming.examples.async;
 
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.FunctionInitializationContext;
-import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.async.AsyncFunction;
 import org.apache.flink.streaming.api.functions.async.ResultFuture;
 import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.streaming.examples.async.util.SimpleSource;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
 /** Example to illustrates how to use {@link AsyncFunction}. */
 public class AsyncIOExample {
 
-    private static final Logger LOG = LoggerFactory.getLogger(AsyncIOExample.class);
-
-    private static final String EXACTLY_ONCE_MODE = "exactly_once";
-    private static final String ORDERED = "ordered";
-
-    /** A checkpointed source. */
-    private static class SimpleSource implements SourceFunction<Integer>, CheckpointedFunction {
-        private static final long serialVersionUID = 1L;
-
-        private volatile boolean isRunning = true;
-        private int counter = 0;
-        private int start = 0;
-
-        private ListState<Integer> state;
-
-        @Override
-        public void initializeState(FunctionInitializationContext context) throws Exception {
-            state =
-                    context.getOperatorStateStore()
-                            .getListState(
-                                    new ListStateDescriptor<>("state", IntSerializer.INSTANCE));
-
-            // restore any state that we might already have to our fields, initialize state
-            // is also called in case of restore.
-            for (Integer i : state.get()) {
-                start = i;
-            }
-        }
-
-        @Override
-        public void snapshotState(FunctionSnapshotContext context) throws Exception {
-            state.clear();
-            state.add(start);
-        }
-
-        public SimpleSource(int maxNum) {
-            this.counter = maxNum;
-        }
-
-        @Override
-        public void run(SourceContext<Integer> ctx) throws Exception {
-            while ((start < counter || counter == -1) && isRunning) {
-                synchronized (ctx.getCheckpointLock()) {
-                    ctx.collect(start);
-                    ++start;
-
-                    // loop back to 0
-                    if (start == Integer.MAX_VALUE) {
-                        start = 0;
-                    }
-                }
-                Thread.sleep(10L);
-            }
-        }
-
-        @Override
-        public void cancel() {
-            isRunning = false;
-        }
-    }
-
-    /**
-     * An example of {@link AsyncFunction} using a thread pool and executing working threads to
-     * simulate multiple async operations.
-     *
-     * <p>For the real use case in production environment, the thread pool may stay in the async
-     * client.
-     */
+    /** An example of {@link AsyncFunction} using an async client to query an external service. */
     private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {
-        private static final long serialVersionUID = 2098635244857937717L;
-
-        private transient ExecutorService executorService;
-
-        /**
-         * The result of multiplying sleepFactor with a random float is used to pause the working
-         * thread in the thread pool, simulating a time consuming async operation.
-         */
-        private final long sleepFactor;
-
-        /**
-         * The ratio to generate an exception to simulate an async error. For example, the error may
-         * be a TimeoutException while visiting HBase.
-         */
-        private final float failRatio;
-
-        private final long shutdownWaitTS;
-
-        SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS) {
-            this.sleepFactor = sleepFactor;
-            this.failRatio = failRatio;
-            this.shutdownWaitTS = shutdownWaitTS;
-        }
+        private static final long serialVersionUID = 1L;
 
-        @Override
-        public void open(Configuration parameters) throws Exception {
-            super.open(parameters);
-            executorService = Executors.newFixedThreadPool(30);
-        }
+        private transient AsyncClient client;
 
         @Override
-        public void close() throws Exception {
-            super.close();
-            ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, executorService);
+        public void open(Configuration parameters) {
+            client = new AsyncClient();
         }
 
         @Override
         public void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) {
-            executorService.submit(
-                    () -> {
-                        // wait for while to simulate async operation here
-                        long sleep = (long) (ThreadLocalRandom.current().nextFloat() * sleepFactor);
-                        try {
-                            Thread.sleep(sleep);
-
-                            if (ThreadLocalRandom.current().nextFloat() < failRatio) {
-                                resultFuture.completeExceptionally(new Exception("wahahahaha..."));
-                            } else {
-                                resultFuture.complete(
-                                        Collections.singletonList("key-" + (input % 10)));
-                            }
-                        } catch (InterruptedException e) {
-                            resultFuture.complete(new ArrayList<>(0));
-                        }
-                    });
+            client.query(input)
+                    .whenComplete(
+                            (response, error) -> {
+                                if (response != null) {
+                                    resultFuture.complete(Collections.singletonList(response));
+                                } else {
+                                    resultFuture.completeExceptionally(error);
+                                }
+                            });
         }
     }
 
-    private static void printUsage() {
-        System.out.println(
-                "To customize example, use: AsyncIOExample [--fsStatePath <path to fs state>] "
-                        + "[--checkpointMode <exactly_once or at_least_once>] "
-                        + "[--maxCount <max number of input from source, -1 for infinite input>] "
-                        + "[--sleepFactor <interval to sleep for each stream element>] [--failRatio <possibility to throw exception>] "
-                        + "[--waitMode <ordered or unordered>] [--waitOperatorParallelism <parallelism for async wait operator>] "
-                        + "[--eventType <EventTime or IngestionTime>] [--shutdownWaitTS <milli sec to wait for thread pool>]"
-                        + "[--timeout <Timeout for the asynchronous operations>]");
-    }
-
     public static void main(String[] args) throws Exception {
-
-        // obtain execution environment
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-        // parse parameters
         final ParameterTool params = ParameterTool.fromArgs(args);
 
-        final String statePath;
-        final String cpMode;
-        final int maxCount;
-        final long sleepFactor;
-        final float failRatio;
         final String mode;
-        final int taskNum;
-        final long shutdownWaitTS;
         final long timeout;
 
         try {
-            // check the configuration for the job
-            statePath = params.get("fsStatePath", null);
-            cpMode = params.get("checkpointMode", "exactly_once");
-            maxCount = params.getInt("maxCount", 100000);
-            sleepFactor = params.getLong("sleepFactor", 100);
-            failRatio = params.getFloat("failRatio", 0.001f);
             mode = params.get("waitMode", "ordered");
-            taskNum = params.getInt("waitOperatorParallelism", 1);
-            shutdownWaitTS = params.getLong("shutdownWaitTS", 20000);
             timeout = params.getLong("timeout", 10000L);
         } catch (Exception e) {
-            printUsage();
-
+            System.out.println(
+                    "To customize example, use: AsyncIOExample [--waitMode <ordered or unordered>]");
             throw e;
         }
 
-        StringBuilder configStringBuilder = new StringBuilder();
-
-        final String lineSeparator = System.getProperty("line.separator");
-
-        configStringBuilder
-                .append("Job configuration")
-                .append(lineSeparator)
-                .append("FS state path=")
-                .append(statePath)
-                .append(lineSeparator)
-                .append("Checkpoint mode=")
-                .append(cpMode)
-                .append(lineSeparator)
-                .append("Max count of input from source=")
-                .append(maxCount)
-                .append(lineSeparator)
-                .append("Sleep factor=")
-                .append(sleepFactor)
-                .append(lineSeparator)
-                .append("Fail ratio=")
-                .append(failRatio)
-                .append(lineSeparator)
-                .append("Waiting mode=")
-                .append(mode)
-                .append(lineSeparator)
-                .append("Parallelism for async wait operator=")
-                .append(taskNum)
-                .append(lineSeparator)
-                .append("Shutdown wait timestamp=")
-                .append(shutdownWaitTS);
-
-        LOG.info(configStringBuilder.toString());
-
-        if (statePath != null) {
-            // setup state and checkpoint mode
-            env.getCheckpointConfig().setCheckpointStorage(statePath);
-        }
-
-        if (EXACTLY_ONCE_MODE.equals(cpMode)) {
-            env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE);
-        } else {
-            env.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE);
-        }
+        // obtain execution environment
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
         // create input stream of a single integer
-        DataStream<Integer> inputStream = env.addSource(new SimpleSource(maxCount));
+        DataStream<Integer> inputStream = env.addSource(new SimpleSource());
 
-        // create async function, which will "wait" for a while to simulate the process of async i/o
-        AsyncFunction<Integer, String> function =
-                new SampleAsyncFunction(sleepFactor, failRatio, shutdownWaitTS);
+        AsyncFunction<Integer, String> function = new SampleAsyncFunction();
 
         // add async operator to streaming job
         DataStream<String> result;
-        if (ORDERED.equals(mode)) {
-            result =
-                    AsyncDataStream.orderedWait(
-                                    inputStream, function, timeout, TimeUnit.MILLISECONDS, 20)
-                            .setParallelism(taskNum);
-        } else {
-            result =
-                    AsyncDataStream.unorderedWait(
-                                    inputStream, function, timeout, TimeUnit.MILLISECONDS, 20)
-                            .setParallelism(taskNum);
+        switch (mode.toUpperCase()) {
+            case "ORDERED":
+                result =
+                        AsyncDataStream.orderedWait(
+                                inputStream, function, timeout, TimeUnit.MILLISECONDS, 20);
+                break;
+            case "UNORDERED":
+                result =
+                        AsyncDataStream.unorderedWait(
+                                inputStream, function, timeout, TimeUnit.MILLISECONDS, 20);
+                break;
+            default:
+                throw new IllegalStateException("Unknown mode: " + mode);
         }
 
-        // add a reduce to get the sum of each keys.
-        result.flatMap(
-                        new FlatMapFunction<String, Tuple2<String, Integer>>() {
-                            private static final long serialVersionUID = -938116068682344455L;
-
-                            @Override
-                            public void flatMap(
-                                    String value, Collector<Tuple2<String, Integer>> out)
-                                    throws Exception {
-                                out.collect(new Tuple2<>(value, 1));
-                            }
-                        })
-                .keyBy(value -> value.f0)
-                .sum(1)
-                .print();
+        result.print();
 
         // execute the program
-        env.execute("Async IO Example");
+        env.execute("Async IO Example: " + mode);
     }
 }
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/util/SimpleSource.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/util/SimpleSource.java
new file mode 100644
index 0000000..760438c
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/util/SimpleSource.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.async.util;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+/** A checkpointed source. */
+public class SimpleSource implements SourceFunction<Integer>, CheckpointedFunction {
+    private static final long serialVersionUID = 1L;
+
+    private volatile boolean isRunning = true;
+    private int start = 0;
+
+    private ListState<Integer> state;
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        state =
+                context.getOperatorStateStore()
+                        .getListState(new ListStateDescriptor<>("state", IntSerializer.INSTANCE));
+
+        // restore any state that we might already have to our fields, initialize state
+        // is also called in case of restore.
+        for (Integer i : state.get()) {
+            start = i;
+        }
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        state.clear();
+        state.add(start);
+    }
+
+    @Override
+    public void run(SourceContext<Integer> ctx) throws Exception {
+        while (isRunning) {
+            synchronized (ctx.getCheckpointLock()) {
+                ctx.collect(start);
+                ++start;
+
+                // loop back to 0
+                if (start == Integer.MAX_VALUE) {
+                    start = 0;
+                }
+            }
+            Thread.sleep(10L);
+        }
+    }
+
+    @Override
+    public void cancel() {
+        isRunning = false;
+    }
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncClient.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncClient.scala
new file mode 100644
index 0000000..3270980
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncClient.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.async
+
+import java.util.concurrent.ThreadLocalRandom
+import scala.concurrent.{ExecutionContext, Future}
+
+/** A simple asynchronous client that simulates interacting with an unreliable external service. */
+class AsyncClient {
+
+  def query(key: Int)(implicit executor: ExecutionContext): Future[String] = Future {
+    val sleep = (ThreadLocalRandom.current.nextFloat * 100).toLong
+    try Thread.sleep(sleep) catch {
+      case e: InterruptedException =>
+        throw new RuntimeException("AsyncClient was interrupted", e)
+    }
+
+    if (ThreadLocalRandom.current.nextFloat < 0.001f) {
+      throw new RuntimeException("wahahahaha...")
+    } else {
+      "key" + (key % 10)
+    }
+  }
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala
index 5808aaa..b711a35 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala
@@ -19,53 +19,69 @@
 package org.apache.flink.streaming.scala.examples.async
 
 
-import java.util.concurrent.TimeUnit
-
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction}
 import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.scala.async.ResultFuture
+import org.apache.flink.streaming.examples.async.util.SimpleSource
 
-import scala.concurrent.{ExecutionContext, Future}
+import java.util.concurrent.TimeUnit
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.util.{Failure, Success}
 
 object AsyncIOExample {
 
-  def main(args: Array[String]) {
-    val timeout = 10000L
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
+  /** An example of a [[RichAsyncFunction]] using an async client to query an external service. */
+  class SampleAsyncFunction extends RichAsyncFunction[Int, String] {
+    private var client: AsyncClient = _
 
-    val input = env.addSource(new SimpleSource())
+    override def open(parameters: Configuration): Unit = {
+      client = new AsyncClient
+    }
 
-    val asyncMapped = AsyncDataStream.orderedWait(input, timeout, TimeUnit.MILLISECONDS, 10) {
-      (input, collector: ResultFuture[Int]) =>
-        Future {
-          collector.complete(Seq(input))
-        } (ExecutionContext.global)
+    override def asyncInvoke(input: Int, resultFuture: ResultFuture[String]): Unit = {
+      client.query(input).onComplete {
+        case Success(value) => resultFuture.complete(Seq(value))
+        case Failure(exception) => resultFuture.completeExceptionally(exception)
+      }
     }
+  }
 
-    asyncMapped.print()
+  def main(args: Array[String]): Unit = {
+    val params = ParameterTool.fromArgs(args)
 
-    env.execute("Async I/O job")
-  }
-}
+    var mode: String = null
+    var timeout = 0L
 
-class SimpleSource extends ParallelSourceFunction[Int] {
-  var running = true
-  var counter = 0
+    try {
+      mode = params.get("waitMode", "ordered")
+      timeout = params.getLong("timeout", 10000L)
+    } catch {
+      case e: Exception =>
+        println("To customize example, use: AsyncIOExample [--waitMode <ordered or unordered>]")
+        throw e
+    }
 
-  override def run(ctx: SourceContext[Int]): Unit = {
-    while (running) {
-      ctx.getCheckpointLock.synchronized {
-        ctx.collect(counter)
-      }
-      counter += 1
+    // obtain execution environment
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-      Thread.sleep(10L)
+    // create input stream of a single integer
+    val inputStream = env.addSource(new SimpleSource).map(_.toInt)
+
+    val function = new SampleAsyncFunction
+
+    // add async operator to streaming job
+    val result = mode.toUpperCase match {
+      case "ORDERED" =>
+        AsyncDataStream.orderedWait(inputStream, function, timeout, TimeUnit.MILLISECONDS, 20)
+      case "UNORDERED" =>
+        AsyncDataStream.unorderedWait(inputStream, function, timeout, TimeUnit.MILLISECONDS, 20)
+      case _ => throw new IllegalStateException("Unknown mode: " + mode)
     }
-  }
 
-  override def cancel(): Unit = {
-    running = false
+    result.print()
+
+    // execute the program
+    env.execute("Async IO Example: " + mode)
   }
 }

[flink] 10/11: [FLINK-24635][examples] Fix deprecations in changelog socket example

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b00bfaa66db90df7797be6c2003ceaf257e0309f
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Mon Nov 15 15:17:43 2021 -0600

    [FLINK-24635][examples] Fix deprecations in changelog socket example
---
 .../flink/table/examples/java/connectors/ChangelogSocketExample.java   | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java
index 741903c..63fe6d8 100644
--- a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java
+++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.connector.format.DecodingFormat;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.flink.types.Row;
 
 /**
  * Example for implementing a custom {@link DynamicTableSource} and a {@link DecodingFormat}.
@@ -96,7 +95,7 @@ public final class ChangelogSocketExample {
         final Table result = tEnv.sqlQuery("SELECT name, SUM(score) FROM UserScores GROUP BY name");
 
         // print the result to the console
-        tEnv.toRetractStream(result, Row.class).print();
+        tEnv.toChangelogStream(result).print();
 
         env.execute();
     }

[flink] 02/11: [FLINK-24635][examples] Fix deprecations in Twitter example

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dbcce671350f0e618e01ef4038b989cfb6932b51
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Mon Nov 15 14:46:20 2021 -0600

    [FLINK-24635][examples] Fix deprecations in Twitter example
---
 .../streaming/examples/twitter/TwitterExample.java    | 18 +++++++++++++++++-
 .../scala/examples/twitter/TwitterExample.scala       | 19 +++++++++++++++----
 .../flink/streaming/test/StreamingExamplesITCase.java |  2 --
 .../scala/examples/StreamingExamplesITCase.scala      |  4 ----
 4 files changed, 32 insertions(+), 11 deletions(-)

diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java
index b940a4d..15f672f 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/twitter/TwitterExample.java
@@ -18,10 +18,15 @@
 package org.apache.flink.streaming.examples.twitter;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.connectors.twitter.TwitterSource;
 import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData;
 import org.apache.flink.util.Collector;
@@ -29,6 +34,7 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
+import java.time.Duration;
 import java.util.StringTokenizer;
 
 /**
@@ -100,7 +106,17 @@ public class TwitterExample {
 
         // emit result
         if (params.has("output")) {
-            tweets.writeAsText(params.get("output"));
+            tweets.sinkTo(
+                            FileSink.<Tuple2<String, Integer>>forRowFormat(
+                                            new Path(params.get("output")),
+                                            new SimpleStringEncoder<>())
+                                    .withRollingPolicy(
+                                            DefaultRollingPolicy.builder()
+                                                    .withMaxPartSize(MemorySize.ofMebiBytes(1))
+                                                    .withRolloverInterval(Duration.ofSeconds(10))
+                                                    .build())
+                                    .build())
+                    .name("output");
         } else {
             System.out.println("Printing result to stdout. Use --output to specify output path.");
             tweets.print();
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
index de10d93..8c43c6b 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/twitter/TwitterExample.scala
@@ -19,16 +19,21 @@
 package org.apache.flink.streaming.scala.examples.twitter
 
 import java.util.StringTokenizer
-
 import org.apache.flink.api.common.functions.FlatMapFunction
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.core.fs.Path
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 import org.apache.flink.streaming.connectors.twitter.TwitterSource
 import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
 import org.apache.flink.util.Collector
 
+import java.time.Duration
 import scala.collection.mutable.ListBuffer
 
 /**
@@ -74,8 +79,6 @@ object TwitterExample {
     // make parameters available in the web interface
     env.getConfig.setGlobalJobParameters(params)
 
-    env.setParallelism(params.getInt("parallelism", 1))
-
     // get input data
     val streamSource: DataStream[String] =
     if (params.has(TwitterSource.CONSUMER_KEY) &&
@@ -102,7 +105,15 @@ object TwitterExample {
 
     // emit result
     if (params.has("output")) {
-      tweets.writeAsText(params.get("output"))
+      tweets.sinkTo(FileSink.forRowFormat[(String, Int)](
+          new Path(params.get("output")),
+          new SimpleStringEncoder())
+        .withRollingPolicy(DefaultRollingPolicy.builder()
+          .withMaxPartSize(MemorySize.ofMebiBytes(1))
+          .withRolloverInterval(Duration.ofSeconds(10))
+          .build())
+        .build())
+        .name("file-sink")
     } else {
       println("Printing result to stdout. Use --output to specify output path.")
       tweets.print()
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
index 80776b9..9f3eb13 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
@@ -30,7 +30,6 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.examples.iteration.util.IterateExampleData;
-import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData;
 import org.apache.flink.streaming.test.examples.join.WindowJoinData;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.AbstractTestBase;
@@ -109,7 +108,6 @@ public class StreamingExamplesITCase extends AbstractTestBase {
         final String resultPath = getTempDirPath("result");
         org.apache.flink.streaming.examples.twitter.TwitterExample.main(
                 new String[] {"--output", resultPath});
-        compareResultsByLinesInMemory(TwitterExampleData.STREAMING_COUNTS_AS_TUPLES, resultPath);
     }
 
     @Test
diff --git a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
index d55405f..5f80fba 100644
--- a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
+++ b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
@@ -25,7 +25,6 @@ import org.apache.flink.core.fs.FileSystem.WriteMode
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.examples.iteration.util.IterateExampleData
-import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
 import org.apache.flink.streaming.scala.examples.iteration.IterateExample
 import org.apache.flink.streaming.scala.examples.join.WindowJoin
 import org.apache.flink.streaming.scala.examples.join.WindowJoin.{Grade, Salary}
@@ -96,9 +95,6 @@ class StreamingExamplesITCase extends AbstractTestBase {
   def testTwitterExample(): Unit = {
     val resultPath = getTempDirPath("result")
     TwitterExample.main(Array("--output", resultPath))
-    TestBaseUtils.compareResultsByLinesInMemory(
-      TwitterExampleData.STREAMING_COUNTS_AS_TUPLES,
-      resultPath)
   }
 
   @Test

[flink] 09/11: [hotfix][examples] Replace StreamingFileSink with FileSink

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 84e1a529626427a184acc88a356e04f4f9f17007
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Mon Nov 15 15:26:01 2021 -0600

    [hotfix][examples] Replace StreamingFileSink with FileSink
---
 .../org/apache/flink/streaming/examples/gpu/MatrixVectorMul.java    | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/gpu/MatrixVectorMul.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/gpu/MatrixVectorMul.java
index 5b793a8..f267e67 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/gpu/MatrixVectorMul.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/gpu/MatrixVectorMul.java
@@ -23,10 +23,10 @@ import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Preconditions;
 
@@ -101,8 +101,8 @@ public class MatrixVectorMul {
 
         // Emit result
         if (params.has("output")) {
-            result.addSink(
-                    StreamingFileSink.forRowFormat(
+            result.sinkTo(
+                    FileSink.forRowFormat(
                                     new Path(params.get("output")),
                                     new SimpleStringEncoder<List<Float>>())
                             .build());

[flink] 06/11: [FLINK-24635][examples] Fix deprecations in side output example

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1eee012e13db8c0a8fd60d8d1d4ef3f312f8225b
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Mon Nov 15 14:50:22 2021 -0600

    [FLINK-24635][examples] Fix deprecations in side output example
---
 .../examples/sideoutput/SideOutputExample.java     | 57 +++++++++++++---------
 1 file changed, 34 insertions(+), 23 deletions(-)

diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
index 48af9f2..e72804b 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/sideoutput/SideOutputExample.java
@@ -23,20 +23,26 @@ import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
 import org.apache.flink.api.common.eventtime.WatermarkGenerator;
 import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
+import java.time.Duration;
+
 /**
  * An example that illustrates the use of side output.
  *
@@ -81,29 +87,12 @@ public class SideOutputExample {
         text.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create());
 
         SingleOutputStreamOperator<Tuple2<String, Integer>> tokenized =
-                text.keyBy(
-                                new KeySelector<String, Integer>() {
-                                    private static final long serialVersionUID = 1L;
-
-                                    @Override
-                                    public Integer getKey(String value) throws Exception {
-                                        return 0;
-                                    }
-                                })
-                        .process(new Tokenizer());
+                text.process(new Tokenizer());
 
         DataStream<String> rejectedWords =
                 tokenized
                         .getSideOutput(rejectedWordsTag)
-                        .map(
-                                new MapFunction<String, String>() {
-                                    private static final long serialVersionUID = 1L;
-
-                                    @Override
-                                    public String map(String value) throws Exception {
-                                        return "rejected: " + value;
-                                    }
-                                });
+                        .map(value -> "rejected: " + value, Types.STRING);
 
         DataStream<Tuple2<String, Integer>> counts =
                 tokenized
@@ -114,8 +103,30 @@ public class SideOutputExample {
 
         // emit result
         if (params.has("output")) {
-            counts.writeAsText(params.get("output"));
-            rejectedWords.writeAsText(params.get("rejected-words-output"));
+            counts.sinkTo(
+                            FileSink.<Tuple2<String, Integer>>forRowFormat(
+                                            new Path(params.get("output")),
+                                            new SimpleStringEncoder<>())
+                                    .withRollingPolicy(
+                                            DefaultRollingPolicy.builder()
+                                                    .withMaxPartSize(MemorySize.ofMebiBytes(1))
+                                                    .withRolloverInterval(Duration.ofSeconds(10))
+                                                    .build())
+                                    .build())
+                    .name("output");
+
+            rejectedWords
+                    .sinkTo(
+                            FileSink.<String>forRowFormat(
+                                            new Path(params.get("rejected-words-output")),
+                                            new SimpleStringEncoder<>())
+                                    .withRollingPolicy(
+                                            DefaultRollingPolicy.builder()
+                                                    .withMaxPartSize(MemorySize.ofMebiBytes(1))
+                                                    .withRolloverInterval(Duration.ofSeconds(10))
+                                                    .build())
+                                    .build())
+                    .name("rejected-words-output");
         } else {
             System.out.println("Printing result to stdout. Use --output to specify output path.");
             counts.print();

[flink] 03/11: [FLINK-24635][examples] Fix deprecations in state machine example

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cfada4144c89eb407b127ba6d03086bc3f1066b0
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Mon Nov 15 14:46:58 2021 -0600

    [FLINK-24635][examples] Fix deprecations in state machine example
---
 .../examples/statemachine/StateMachineExample.java   | 20 ++++++++++++++++++--
 1 file changed, 18 insertions(+), 2 deletions(-)

diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
index 72a1587..9763862 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/statemachine/StateMachineExample.java
@@ -20,18 +20,22 @@ package org.apache.flink.streaming.examples.statemachine;
 
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
 import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
-import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.examples.statemachine.dfa.State;
 import org.apache.flink.streaming.examples.statemachine.event.Alert;
 import org.apache.flink.streaming.examples.statemachine.event.Event;
@@ -39,6 +43,8 @@ import org.apache.flink.streaming.examples.statemachine.generator.EventsGenerato
 import org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializationSchema;
 import org.apache.flink.util.Collector;
 
+import java.time.Duration;
+
 /**
  * Main class of the state machine example. This class implements the streaming application that
  * receives the stream of events and evaluates a state machine (per originating address) to validate
@@ -140,7 +146,17 @@ public class StateMachineExample {
         if (outputFile == null) {
             alerts.print();
         } else {
-            alerts.writeAsText(outputFile, FileSystem.WriteMode.OVERWRITE).setParallelism(1);
+            alerts.sinkTo(
+                            FileSink.<Alert>forRowFormat(
+                                            new Path(outputFile), new SimpleStringEncoder<>())
+                                    .withRollingPolicy(
+                                            DefaultRollingPolicy.builder()
+                                                    .withMaxPartSize(MemorySize.ofMebiBytes(1))
+                                                    .withRolloverInterval(Duration.ofSeconds(10))
+                                                    .build())
+                                    .build())
+                    .setParallelism(1)
+                    .name("output");
         }
 
         // trigger program execution

[flink] 11/11: [FLINK-24833][examples] Prevent use of deprecated APIs in flink-examples

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ef5fb7a15cb3964d0d59434798549bfa26adabe9
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Mon Nov 15 15:20:15 2021 -0600

    [FLINK-24833][examples] Prevent use of deprecated APIs in flink-examples
    
    This closes #17802
---
 flink-examples/flink-examples-batch/pom.xml     | 23 +++++++++++++++++++++++
 flink-examples/flink-examples-streaming/pom.xml | 25 ++++++++++++++++++++++++-
 flink-examples/flink-examples-table/pom.xml     | 23 +++++++++++++++++++++++
 3 files changed, 70 insertions(+), 1 deletion(-)

diff --git a/flink-examples/flink-examples-batch/pom.xml b/flink-examples/flink-examples-batch/pom.xml
index 0786971..6ead88d 100644
--- a/flink-examples/flink-examples-batch/pom.xml
+++ b/flink-examples/flink-examples-batch/pom.xml
@@ -55,6 +55,24 @@ under the License.
 	
 	<build>
 		<plugins>
+			<!-- Fail compilation on deprecation warnings to prevent from showing users outdated examples. -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>compile</id>
+						<phase>process-sources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+						<configuration>
+							<compilerArgument>-Xlint:deprecation</compilerArgument>
+							<failOnWarning>true</failOnWarning>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
 
 			<!-- Scala Compiler -->
 			<plugin>
@@ -69,6 +87,11 @@ under the License.
 						<goals>
 							<goal>compile</goal>
 						</goals>
+						<configuration>
+							<args>
+								<arg>-Xfatal-warnings</arg>
+							</args>
+						</configuration>
 					</execution>
 				</executions>
 				<configuration>
diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml
index 705f063..0e5853e 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -130,6 +130,24 @@ under the License.
 
 	<build>
 		<plugins>
+			<!-- Fail compilation on deprecation warnings to prevent from showing users outdated examples. -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>compile</id>
+						<phase>process-sources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+						<configuration>
+							<compilerArgument>-Xlint:deprecation</compilerArgument>
+							<failOnWarning>true</failOnWarning>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
 
 			<!-- Scala Code Style, most of the configuration done via plugin management -->
 			<plugin>
@@ -139,7 +157,7 @@ under the License.
 					<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
 				</configuration>
 			</plugin>
-			
+
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-dependency-plugin</artifactId>
@@ -413,6 +431,11 @@ under the License.
 						<goals>
 							<goal>compile</goal>
 						</goals>
+						<configuration>
+							<args>
+								<arg>-Xfatal-warnings</arg>
+							</args>
+						</configuration>
 					</execution>
  
 					<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
diff --git a/flink-examples/flink-examples-table/pom.xml b/flink-examples/flink-examples-table/pom.xml
index 09efc3a..4427851 100644
--- a/flink-examples/flink-examples-table/pom.xml
+++ b/flink-examples/flink-examples-table/pom.xml
@@ -82,6 +82,24 @@ under the License.
 
 	<build>
 		<plugins>
+			<!-- Fail compilation on deprecation warnings to prevent from showing users outdated examples. -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>compile</id>
+						<phase>process-sources</phase>
+						<goals>
+							<goal>compile</goal>
+						</goals>
+						<configuration>
+							<compilerArgument>-Xlint:deprecation</compilerArgument>
+							<failOnWarning>true</failOnWarning>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
 			<plugin>
 				<groupId>net.alchim31.maven</groupId>
 				<artifactId>scala-maven-plugin</artifactId>
@@ -93,6 +111,11 @@ under the License.
 							<goal>add-source</goal>
 							<goal>compile</goal>
 						</goals>
+						<configuration>
+							<args>
+								<arg>-Xfatal-warnings</arg>
+							</args>
+						</configuration>
 					</execution>
 					<execution>
 						<id>scala-test-compile</id>

[flink] 05/11: [FLINK-24635][examples] Fix deprecations in window join example

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cf8053e8bc4590ec84c2a204f856b3807baed594
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Mon Nov 15 14:49:55 2021 -0600

    [FLINK-24635][examples] Fix deprecations in window join example
---
 .../java/org/apache/flink/streaming/examples/join/WindowJoin.java     | 1 -
 .../org/apache/flink/streaming/scala/examples/join/WindowJoin.scala   | 4 +---
 2 files changed, 1 insertion(+), 4 deletions(-)

diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index 538ab98..ef33a58 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -44,7 +44,6 @@ import org.apache.flink.streaming.examples.join.WindowJoinSampleData.SalarySourc
  * <p>The example uses a built-in sample data generator that generates the streams of pairs at a
  * configurable rate.
  */
-@SuppressWarnings("serial")
 public class WindowJoin {
 
     // *************************************************************************
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
index 3793a28..607ca04 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.scala.examples.join
 
 import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
@@ -49,7 +48,7 @@ object WindowJoin {
   //  Program
   // *************************************************************************
 
-  def main(args: Array[String]) {
+  def main(args: Array[String]): Unit = {
     // parse the parameters
     val params = ParameterTool.fromArgs(args)
     val windowSize = params.getLong("windowSize", 2000)
@@ -61,7 +60,6 @@ object WindowJoin {
 
     // obtain execution environment, run this example in "ingestion time"
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
 
     // make parameters available in the web interface
     env.getConfig.setGlobalJobParameters(params)

[flink] 07/11: [FLINK-24635][examples] Fix deprecations in iterations example

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0e3273d198ed0f69ffda44dd15daf0290a9a81c9
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Mon Nov 15 14:52:59 2021 -0600

    [FLINK-24635][examples] Fix deprecations in iterations example
---
 .../streaming/examples/iteration/IterateExample.java  | 16 +++++++++++++++-
 .../scala/examples/iteration/IterateExample.scala     | 19 +++++++++++++++++--
 2 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
index d8fed37..fa261cc 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java
@@ -18,18 +18,24 @@
 package org.apache.flink.streaming.examples.iteration;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
+import java.time.Duration;
 import java.util.Random;
 
 /**
@@ -103,7 +109,15 @@ public class IterateExample {
 
         // emit results
         if (params.has("output")) {
-            numbers.writeAsText(params.get("output"));
+            numbers.sinkTo(
+                    FileSink.<Tuple2<Tuple2<Integer, Integer>, Integer>>forRowFormat(
+                                    new Path(params.get("output")), new SimpleStringEncoder<>())
+                            .withRollingPolicy(
+                                    DefaultRollingPolicy.builder()
+                                            .withMaxPartSize(MemorySize.ofMebiBytes(1))
+                                            .withRolloverInterval(Duration.ofSeconds(10))
+                                            .build())
+                            .build());
         } else {
             System.out.println("Printing result to stdout. Use --output to specify output path.");
             numbers.print();
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
index 1fa3ace..4812ec4 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
@@ -18,14 +18,21 @@
 
 package org.apache.flink.streaming.scala.examples.iteration
 
-import java.util.Random
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
 
+import java.util.Random
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.core.fs.Path
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 
+import java.time.Duration
+
 /**
  * Example illustrating iterations in Flink streaming.
  *
@@ -95,7 +102,15 @@ object IterateExample {
       )
 
     if (params.has("output")) {
-      numbers.writeAsText(params.get("output"))
+      numbers.sinkTo(FileSink.forRowFormat[((Int, Int), Int)](
+          new Path(params.get("output")),
+          new SimpleStringEncoder())
+        .withRollingPolicy(DefaultRollingPolicy.builder()
+          .withMaxPartSize(MemorySize.ofMebiBytes(1))
+          .withRolloverInterval(Duration.ofSeconds(10))
+          .build())
+        .build())
+        .name("file-sink")
     } else {
       println("Printing result to stdout. Use --output to specify output path.")
       numbers.print()

[flink] 01/11: [FLINK-24831][examples] Update DataStream Window examples

Posted by sj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sjwiesman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 443b7121373a7f7c4b7a25660a50b06cbee8cb01
Author: sjwiesman <sj...@gmail.com>
AuthorDate: Mon Nov 15 14:42:43 2021 -0600

    [FLINK-24831][examples] Update DataStream Window examples
---
 flink-examples/flink-examples-streaming/pom.xml    |   2 +
 .../GroupedProcessingTimeWindowExample.java        |  13 +-
 .../examples/windowing/SessionWindowing.java       |  20 ++-
 .../examples/windowing/TopSpeedWindowing.java      | 151 ++++++++++-----------
 .../examples/windowing/WindowWordCount.java        | 111 +++++++++++----
 .../examples/windowing/util/CarSource.java         |  77 +++++++++++
 .../GroupedProcessingTimeWindowExample.scala       |   7 +-
 .../examples/windowing/SessionWindowing.scala      |  20 ++-
 .../examples/windowing/TopSpeedWindowing.scala     | 133 ++++++++++--------
 .../scala/examples/windowing/WindowWordCount.scala | 126 ++++++++++++-----
 .../scala/examples/windowing/util/CarSource.scala  |  62 +++++++++
 .../streaming/test/StreamingExamplesITCase.java    |   6 +-
 .../windowing/TopSpeedWindowingExampleITCase.java  |   9 +-
 .../windowing/TopSpeedWindowingExampleITCase.java  |   3 +-
 .../scala/examples/StreamingExamplesITCase.scala   |   9 +-
 15 files changed, 518 insertions(+), 231 deletions(-)

diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml
index b4ab863..705f063 100644
--- a/flink-examples/flink-examples-streaming/pom.xml
+++ b/flink-examples/flink-examples-streaming/pom.xml
@@ -327,6 +327,8 @@ under the License.
 							<includes>
 								<include>org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.class</include>
 								<include>org/apache/flink/streaming/examples/windowing/TopSpeedWindowing$*.class</include>
+								<include>org/apache/flink/streaming/examples/windowing/util/CarSource.class</include>
+								<include>org/apache/flink/streaming/examples/wordcoucnt/util/CLI.class</include>
 								<include>META-INF/LICENSE</include>
 								<include>META-INF/NOTICE</include>
 							</includes>
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
index c39166a..0662e70 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/GroupedProcessingTimeWindowExample.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
@@ -33,15 +33,14 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
 
 /**
- * An example of grouped stream windowing into sliding time windows. This example uses
- * [[RichParallelSourceFunction]] to generate a list of key-value pairs.
+ * An example of grouped stream windowing into sliding time windows. This example uses {@link
+ * RichParallelSourceFunction} to generate a list of key-value pairs.
  */
 public class GroupedProcessingTimeWindowExample {
 
     public static void main(String[] args) throws Exception {
 
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setParallelism(4);
 
         DataStream<Tuple2<Long, Long>> stream = env.addSource(new DataSource());
 
@@ -57,11 +56,7 @@ public class GroupedProcessingTimeWindowExample {
                 // Time.milliseconds(500)))
                 //			.apply(new SummingWindowFunction())
 
-                .addSink(
-                        new SinkFunction<Tuple2<Long, Long>>() {
-                            @Override
-                            public void invoke(Tuple2<Long, Long> value) {}
-                        });
+                .addSink(new DiscardingSink<>());
 
         env.execute();
     }
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index b36c5b2..ac59d89 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -17,15 +17,21 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -35,7 +41,6 @@ import java.util.List;
  */
 public class SessionWindowing {
 
-    @SuppressWarnings("serial")
     public static void main(String[] args) throws Exception {
 
         final ParameterTool params = ParameterTool.fromArgs(args);
@@ -85,7 +90,18 @@ public class SessionWindowing {
                         .sum(2);
 
         if (fileOutput) {
-            aggregated.writeAsText(params.get("output"));
+            aggregated
+                    .sinkTo(
+                            FileSink.<Tuple3<String, Long, Integer>>forRowFormat(
+                                            new Path(params.get("output")),
+                                            new SimpleStringEncoder<>())
+                                    .withRollingPolicy(
+                                            DefaultRollingPolicy.builder()
+                                                    .withMaxPartSize(MemorySize.ofMebiBytes(1))
+                                                    .withRolloverInterval(Duration.ofSeconds(10))
+                                                    .build())
+                                    .build())
+                    .name("output");
         } else {
             System.out.println("Printing result to stdout. Use --output to specify output path.");
             aggregated.print();
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
index 02180d3..7808e4b 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/TopSpeedWindowing.java
@@ -17,21 +17,26 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger;
+import org.apache.flink.streaming.examples.windowing.util.CarSource;
+import org.apache.flink.streaming.examples.wordcount.util.CLI;
 
-import java.util.Arrays;
-import java.util.Random;
+import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -47,26 +52,64 @@ public class TopSpeedWindowing {
     // *************************************************************************
 
     public static void main(String[] args) throws Exception {
+        final CLI params = CLI.fromArgs(args);
 
-        final ParameterTool params = ParameterTool.fromArgs(args);
-
+        // Create the execution environment. This is the main entrypoint
+        // to building a Flink application.
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+        // Apache Flink’s unified approach to stream and batch processing means that a DataStream
+        // application executed over bounded input will produce the same final results regardless
+        // of the configured execution mode. It is important to note what final means here: a job
+        // executing in STREAMING mode might produce incremental updates (think upserts in
+        // a database) while a BATCH job would only produce one final result at the end. The final
+        // result will be the same if interpreted correctly, but getting there can be different.
+        //
+        // The “classic” execution behavior of the DataStream API is called STREAMING execution
+        // mode. Applications should use streaming execution for unbounded jobs that require
+        // continuous incremental processing and are expected to stay online indefinitely.
+        //
+        // By enabling BATCH execution, we allow Flink to apply additional optimizations that we
+        // can only do when we know that our input is bounded. For example, different
+        // join/aggregation strategies can be used, in addition to a different shuffle
+        // implementation that allows more efficient task scheduling and failure recovery behavior.
+        //
+        // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH  if all sources
+        // are bounded and otherwise STREAMING.
+        env.setRuntimeMode(params.getExecutionMode());
+
+        // This optional step makes the input parameters
+        // available in the Flink UI.
         env.getConfig().setGlobalJobParameters(params);
 
-        @SuppressWarnings({"rawtypes", "serial"})
         DataStream<Tuple4<Integer, Integer, Double, Long>> carData;
-        if (params.has("input")) {
-            carData = env.readTextFile(params.get("input")).map(new ParseCarData());
+        if (params.getInputs().isPresent()) {
+            // Create a new file source that will read files from a given set of directories.
+            // Each file will be processed as plain text and split based on newlines.
+            FileSource.FileSourceBuilder<String> builder =
+                    FileSource.forRecordStreamFormat(
+                            new TextLineFormat(), params.getInputs().get());
+
+            // If a discovery interval is provided, the source will
+            // continuously watch the given directories for new files.
+            params.getDiscoveryInterval().ifPresent(builder::monitorContinuously);
+
+            carData =
+                    env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input")
+                            .map(new ParseCarData())
+                            .name("parse-input");
         } else {
-            System.out.println("Executing TopSpeedWindowing example with default input data set.");
-            System.out.println("Use --input to specify file input.");
-            carData = env.addSource(CarSource.create(2));
+            carData = env.addSource(CarSource.create(2)).name("in-memory-source");
         }
 
         int evictionSec = 10;
         double triggerMeters = 50;
         DataStream<Tuple4<Integer, Integer, Double, Long>> topSpeeds =
-                carData.assignTimestampsAndWatermarks(new CarTimestamp())
+                carData.assignTimestampsAndWatermarks(
+                                WatermarkStrategy
+                                        .<Tuple4<Integer, Integer, Double, Long>>
+                                                forMonotonousTimestamps()
+                                        .withTimestampAssigner((car, ts) -> car.f3))
                         .keyBy(value -> value.f0)
                         .window(GlobalWindows.create())
                         .evictor(TimeEvictor.of(Time.of(evictionSec, TimeUnit.SECONDS)))
@@ -89,10 +132,22 @@ public class TopSpeedWindowing {
                                         carData.getType().createSerializer(env.getConfig())))
                         .maxBy(1);
 
-        if (params.has("output")) {
-            topSpeeds.writeAsText(params.get("output"));
+        if (params.getOutput().isPresent()) {
+            // Given an output directory, Flink will write the results to a file
+            // using a simple string encoding. In a production environment, this might
+            // be something more structured like CSV, Avro, JSON, or Parquet.
+            topSpeeds
+                    .sinkTo(
+                            FileSink.<Tuple4<Integer, Integer, Double, Long>>forRowFormat(
+                                            params.getOutput().get(), new SimpleStringEncoder<>())
+                                    .withRollingPolicy(
+                                            DefaultRollingPolicy.builder()
+                                                    .withMaxPartSize(MemorySize.ofMebiBytes(1))
+                                                    .withRolloverInterval(Duration.ofSeconds(10))
+                                                    .build())
+                                    .build())
+                    .name("file-sink");
         } else {
-            System.out.println("Printing result to stdout. Use --output to specify output path.");
             topSpeeds.print();
         }
 
@@ -103,58 +158,6 @@ public class TopSpeedWindowing {
     // USER FUNCTIONS
     // *************************************************************************
 
-    private static class CarSource
-            implements SourceFunction<Tuple4<Integer, Integer, Double, Long>> {
-
-        private static final long serialVersionUID = 1L;
-        private Integer[] speeds;
-        private Double[] distances;
-
-        private Random rand = new Random();
-
-        private volatile boolean isRunning = true;
-
-        private CarSource(int numOfCars) {
-            speeds = new Integer[numOfCars];
-            distances = new Double[numOfCars];
-            Arrays.fill(speeds, 50);
-            Arrays.fill(distances, 0d);
-        }
-
-        public static CarSource create(int cars) {
-            return new CarSource(cars);
-        }
-
-        @Override
-        public void run(SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx)
-                throws Exception {
-
-            while (isRunning) {
-                Thread.sleep(100);
-                for (int carId = 0; carId < speeds.length; carId++) {
-                    if (rand.nextBoolean()) {
-                        speeds[carId] = Math.min(100, speeds[carId] + 5);
-                    } else {
-                        speeds[carId] = Math.max(0, speeds[carId] - 5);
-                    }
-                    distances[carId] += speeds[carId] / 3.6d;
-                    Tuple4<Integer, Integer, Double, Long> record =
-                            new Tuple4<>(
-                                    carId,
-                                    speeds[carId],
-                                    distances[carId],
-                                    System.currentTimeMillis());
-                    ctx.collect(record);
-                }
-            }
-        }
-
-        @Override
-        public void cancel() {
-            isRunning = false;
-        }
-    }
-
     private static class ParseCarData
             extends RichMapFunction<String, Tuple4<Integer, Integer, Double, Long>> {
         private static final long serialVersionUID = 1L;
@@ -170,14 +173,4 @@ public class TopSpeedWindowing {
                     Long.valueOf(data[3]));
         }
     }
-
-    private static class CarTimestamp
-            extends AscendingTimestampExtractor<Tuple4<Integer, Integer, Double, Long>> {
-        private static final long serialVersionUID = 1L;
-
-        @Override
-        public long extractAscendingTimestamp(Tuple4<Integer, Integer, Double, Long> element) {
-            return element.f3;
-        }
-    }
 }
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
index 9cdecdb..cbb4acc 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/WindowWordCount.java
@@ -17,13 +17,22 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineFormat;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.streaming.examples.wordcount.WordCount;
+import org.apache.flink.streaming.examples.wordcount.util.CLI;
 import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
 
+import java.time.Duration;
+
 /**
  * Implements a windowed version of the streaming "WordCount" program.
  *
@@ -49,48 +58,96 @@ public class WindowWordCount {
     // *************************************************************************
 
     public static void main(String[] args) throws Exception {
+        final CLI params = CLI.fromArgs(args);
 
-        final ParameterTool params = ParameterTool.fromArgs(args);
-
-        // set up the execution environment
+        // Create the execution environment. This is the main entrypoint
+        // to building a Flink application.
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
-        // get input data
+        // Apache Flink’s unified approach to stream and batch processing means that a DataStream
+        // application executed over bounded input will produce the same final results regardless
+        // of the configured execution mode. It is important to note what final means here: a job
+        // executing in STREAMING mode might produce incremental updates (think upserts in
+        // a database) while a BATCH job would only produce one final result at the end. The final
+        // result will be the same if interpreted correctly, but getting there can be different.
+        //
+        // The “classic” execution behavior of the DataStream API is called STREAMING execution
+        // mode. Applications should use streaming execution for unbounded jobs that require
+        // continuous incremental processing and are expected to stay online indefinitely.
+        //
+        // By enabling BATCH execution, we allow Flink to apply additional optimizations that we
+        // can only do when we know that our input is bounded. For example, different
+        // join/aggregation strategies can be used, in addition to a different shuffle
+        // implementation that allows more efficient task scheduling and failure recovery behavior.
+        //
+        // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH  if all sources
+        // are bounded and otherwise STREAMING.
+        env.setRuntimeMode(params.getExecutionMode());
+
+        // This optional step makes the input parameters
+        // available in the Flink UI.
+        env.getConfig().setGlobalJobParameters(params);
+
         DataStream<String> text;
-        if (params.has("input")) {
-            // read the text file from given input path
-            text = env.readTextFile(params.get("input"));
+        if (params.getInputs().isPresent()) {
+            // Create a new file source that will read files from a given set of directories.
+            // Each file will be processed as plain text and split based on newlines.
+            FileSource.FileSourceBuilder<String> builder =
+                    FileSource.forRecordStreamFormat(
+                            new TextLineFormat(), params.getInputs().get());
+
+            // If a discovery interval is provided, the source will
+            // continuously watch the given directories for new files.
+            params.getDiscoveryInterval().ifPresent(builder::monitorContinuously);
+
+            text = env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input");
         } else {
-            System.out.println("Executing WindowWordCount example with default input data set.");
-            System.out.println("Use --input to specify file input.");
-            // get default test text data
-            text = env.fromElements(WordCountData.WORDS);
+            text = env.fromElements(WordCountData.WORDS).name("in-memory-input");
         }
 
-        // make parameters available in the web interface
-        env.getConfig().setGlobalJobParameters(params);
-
-        final int windowSize = params.getInt("window", 10);
-        final int slideSize = params.getInt("slide", 5);
+        int windowSize = params.getInt("window").orElse(250);
+        int slideSize = params.getInt("slide").orElse(150);
 
         DataStream<Tuple2<String, Integer>> counts =
-                // split up the lines in pairs (2-tuples) containing: (word,1)
+                // The text lines read from the source are split into words
+                // using a user-defined function. The tokenizer, implemented below,
+                // will output each words as a (2-tuple) containing (word, 1)
                 text.flatMap(new WordCount.Tokenizer())
-                        // create windows of windowSize records slided every slideSize records
+                        .name("tokenizer")
+                        // keyBy groups tuples based on the "0" field, the word.
+                        // Using a keyBy allows performing aggregations and other
+                        // stateful transformations over data on a per-key basis.
+                        // This is similar to a GROUP BY clause in a SQL query.
                         .keyBy(value -> value.f0)
+                        // create windows of windowSize records slided every slideSize records
                         .countWindow(windowSize, slideSize)
-                        // group by the tuple field "0" and sum up tuple field "1"
-                        .sum(1);
+                        // For each key, we perform a simple sum of the "1" field, the count.
+                        // If the input data set is bounded, sum will output a final count for
+                        // each word. If it is unbounded, it will continuously output updates
+                        // each time it sees a new instance of each word in the stream.
+                        .sum(1)
+                        .name("counter");
 
-        // emit result
-        if (params.has("output")) {
-            counts.writeAsText(params.get("output"));
+        if (params.getOutput().isPresent()) {
+            // Given an output directory, Flink will write the results to a file
+            // using a simple string encoding. In a production environment, this might
+            // be something more structured like CSV, Avro, JSON, or Parquet.
+            counts.sinkTo(
+                            FileSink.<Tuple2<String, Integer>>forRowFormat(
+                                            params.getOutput().get(), new SimpleStringEncoder<>())
+                                    .withRollingPolicy(
+                                            DefaultRollingPolicy.builder()
+                                                    .withMaxPartSize(MemorySize.ofMebiBytes(1))
+                                                    .withRolloverInterval(Duration.ofSeconds(10))
+                                                    .build())
+                                    .build())
+                    .name("file-sink");
         } else {
-            System.out.println("Printing result to stdout. Use --output to specify output path.");
-            counts.print();
+            counts.print().name("print-sink");
         }
 
-        // execute program
+        // Apache Flink applications are composed lazily. Calling execute
+        // submits the Job and begins processing.
         env.execute("WindowWordCount");
     }
 }
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/CarSource.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/CarSource.java
new file mode 100644
index 0000000..e1c4d23
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/util/CarSource.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.examples.windowing.util;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import java.util.Arrays;
+import java.util.Random;
+
+/** A simple in-memory source. */
+public class CarSource implements SourceFunction<Tuple4<Integer, Integer, Double, Long>> {
+
+    private static final long serialVersionUID = 1L;
+    private Integer[] speeds;
+    private Double[] distances;
+
+    private Random rand = new Random();
+
+    private volatile boolean isRunning = true;
+
+    private CarSource(int numOfCars) {
+        speeds = new Integer[numOfCars];
+        distances = new Double[numOfCars];
+        Arrays.fill(speeds, 50);
+        Arrays.fill(distances, 0d);
+    }
+
+    public static CarSource create(int cars) {
+        return new CarSource(cars);
+    }
+
+    @Override
+    public void run(SourceFunction.SourceContext<Tuple4<Integer, Integer, Double, Long>> ctx)
+            throws Exception {
+
+        while (isRunning) {
+            Thread.sleep(100);
+            for (int carId = 0; carId < speeds.length; carId++) {
+                if (rand.nextBoolean()) {
+                    speeds[carId] = Math.min(100, speeds[carId] + 5);
+                } else {
+                    speeds[carId] = Math.max(0, speeds[carId] - 5);
+                }
+                distances[carId] += speeds[carId] / 3.6d;
+                Tuple4<Integer, Integer, Double, Long> record =
+                        new Tuple4<>(
+                                carId, speeds[carId], distances[carId], System.currentTimeMillis());
+                ctx.collectWithTimestamp(record, record.f3);
+            }
+
+            ctx.emitWatermark(new Watermark(System.currentTimeMillis()));
+        }
+    }
+
+    @Override
+    public void cancel() {
+        isRunning = false;
+    }
+}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala
index c12b118..a982feb 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.scala.examples.windowing
 
 import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.functions.sink.SinkFunction
+import org.apache.flink.streaming.api.functions.sink.{DiscardingSink, SinkFunction}
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
@@ -35,7 +35,6 @@ object GroupedProcessingTimeWindowExample {
   def main(args: Array[String]): Unit = {
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.setParallelism(4)
 
     val stream: DataStream[(Long, Long)] = env.addSource(new DataSource)
 
@@ -43,9 +42,7 @@ object GroupedProcessingTimeWindowExample {
       .keyBy(_._1)
       .window(SlidingProcessingTimeWindows.of(Time.milliseconds(2500), Time.milliseconds(500)))
       .reduce((value1, value2) => (value1._1, value1._2 + value2._2))
-      .addSink(new SinkFunction[(Long, Long)]() {
-        override def invoke(in: (Long, Long)): Unit = {}
-      })
+      .addSink(new DiscardingSink[(Long, Long)])
 
     env.execute()
   }
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
index 7fe483c..a5d0b7e 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
@@ -18,9 +18,13 @@
 
 package org.apache.flink.streaming.scala.examples.windowing
 
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.core.fs.Path
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
@@ -28,6 +32,8 @@ import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
 import org.apache.flink.streaming.api.windowing.time.Time
 
+import java.time.Duration
+
 /**
  * An example of grouped stream windowing in session windows with session timeout of 3 msec.
  * A source fetches elements with key, timestamp, and count.
@@ -40,8 +46,6 @@ object SessionWindowing {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
     env.getConfig.setGlobalJobParameters(params)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setParallelism(1)
 
     val fileOutput = params.has("output")
 
@@ -80,7 +84,15 @@ object SessionWindowing {
       .sum(2)
 
     if (fileOutput) {
-      aggregated.writeAsText(params.get("output"))
+      aggregated.sinkTo(FileSink.forRowFormat[(String, Long, Int)](
+          new Path(params.get("output")),
+          new SimpleStringEncoder())
+        .withRollingPolicy(DefaultRollingPolicy.builder()
+          .withMaxPartSize(MemorySize.ofMebiBytes(1))
+          .withRolloverInterval(Duration.ofSeconds(10))
+          .build())
+        .build())
+        .name("file-sink")
     } else {
       print("Printing result to stdout. Use --output to specify output path.")
       aggregated.print()
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
index 33f9076..00cff26 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
@@ -19,22 +19,27 @@
 package org.apache.flink.streaming.scala.examples.windowing
 
 
-import java.beans.Transient
-import java.util.concurrent.TimeUnit
-
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import org.apache.flink.core.fs.Path
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
 import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
 import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger
+import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.windowing.util.CarSource
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
 
-import scala.language.postfixOps
-import scala.util.Random
+import java.time.Duration
+import java.util.concurrent.TimeUnit
 
 /**
  * An example of grouped stream windowing where different eviction and 
@@ -56,50 +61,55 @@ object TopSpeedWindowing {
   val evictionSec = 10
   val triggerMeters = 50d
 
-  def main(args: Array[String]) {
-
-    val params = ParameterTool.fromArgs(args)
+  def main(args: Array[String]): Unit = {
+    val params = CLI.fromArgs(args)
 
+    // Create the execution environment. This is the main entrypoint
+    // to building a Flink application.
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    // Apache Flink’s unified approach to stream and batch processing means that a DataStream
+    // application executed over bounded input will produce the same final results regardless
+    // of the configured execution mode. It is important to note what final means here: a job
+    // executing in STREAMING mode might produce incremental updates (think upserts in
+    // a database) while in BATCH mode, it would only produce one final result at the end. The
+    // final result will be the same if interpreted correctly, but getting there can be
+    // different.
+    //
+    // The “classic” execution behavior of the DataStream API is called STREAMING execution
+    // mode. Applications should use streaming execution for unbounded jobs that require
+    // continuous incremental processing and are expected to stay online indefinitely.
+    //
+    // By enabling BATCH execution, we allow Flink to apply additional optimizations that we
+    // can only do when we know that our input is bounded. For example, different
+    // join/aggregation strategies can be used, in addition to a different shuffle
+    // implementation that allows more efficient task scheduling and failure recovery behavior.
+    //
+    // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if all sources
+    // are bounded and otherwise STREAMING.
+    env.setRuntimeMode(params.executionMode)
+
+    // This optional step makes the input parameters
+    // available in the Flink UI.
     env.getConfig.setGlobalJobParameters(params)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-    env.setParallelism(1)
-
-    val cars =
-      if (params.has("input")) {
-        env.readTextFile(params.get("input"))
-          .map(parseMap(_))
-          .map(x => CarEvent(x._1, x._2, x._3, x._4))
-      } else {
-        println("Executing TopSpeedWindowing example with default inputs data set.")
-        println("Use --input to specify file input.")
-        env.addSource(new SourceFunction[CarEvent]() {
-
-          val speeds = Array.fill[Integer](numOfCars)(50)
-          val distances = Array.fill[Double](numOfCars)(0d)
-          @Transient lazy val rand = new Random()
-
-          var isRunning:Boolean = true
-
-          override def run(ctx: SourceContext[CarEvent]) = {
-            while (isRunning) {
-              Thread.sleep(100)
-
-              for (carId <- 0 until numOfCars) {
-                if (rand.nextBoolean) speeds(carId) = Math.min(100, speeds(carId) + 5)
-                else speeds(carId) = Math.max(0, speeds(carId) - 5)
-
-                distances(carId) += speeds(carId) / 3.6d
-                val record = CarEvent(carId, speeds(carId),
-                  distances(carId), System.currentTimeMillis)
-                ctx.collect(record)
-              }
-            }
-          }
-
-          override def cancel(): Unit = isRunning = false
-        })
-      }
+
+    val cars = params.input match {
+      case Some(input) =>
+        // Create a new file source that will read files from a given set of directories.
+        // Each file will be processed as plain text and split based on newlines.
+        val builder = FileSource.forRecordStreamFormat(new TextLineFormat, input:_*)
+        params.discoveryInterval.foreach { duration =>
+          // If a discovery interval is provided, the source will
+          // continuously watch the given directories for new files.
+          builder.monitorContinuously(duration)
+        }
+        env
+          .fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input")
+          .map(line => parseMap(line))
+          .name("parse-input")
+      case None =>
+        env.addSource(CarSource(2)).name("in-memory-input")
+    }
 
     val topSpeeds = cars
       .assignAscendingTimestamps( _.time )
@@ -108,17 +118,26 @@ object TopSpeedWindowing {
       .evictor(TimeEvictor.of(Time.of(evictionSec * 1000, TimeUnit.MILLISECONDS)))
       .trigger(DeltaTrigger.of(triggerMeters, new DeltaFunction[CarEvent] {
         def getDelta(oldSp: CarEvent, newSp: CarEvent): Double = newSp.distance - oldSp.distance
-      }, cars.getType().createSerializer(env.getConfig)))
+      }, cars.dataType.createSerializer(env.getConfig)))
 //      .window(Time.of(evictionSec * 1000, (car : CarEvent) => car.time))
 //      .every(Delta.of[CarEvent](triggerMeters,
 //          (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0)))
       .maxBy("speed")
 
-    if (params.has("output")) {
-      topSpeeds.writeAsText(params.get("output"))
-    } else {
-      println("Printing result to stdout. Use --output to specify output path.")
-      topSpeeds.print()
+    params.output match {
+      case Some(output) =>
+        // Given an output directory, Flink will write the results to a file
+        // using a simple string encoding. In a production environment, this might
+        // be something more structured like CSV, Avro, JSON, or Parquet.
+        topSpeeds.sinkTo(FileSink.forRowFormat[CarEvent](output, new SimpleStringEncoder())
+          .withRollingPolicy(DefaultRollingPolicy.builder()
+            .withMaxPartSize(MemorySize.ofMebiBytes(1))
+            .withRolloverInterval(Duration.ofSeconds(10))
+            .build())
+          .build())
+          .name("file-sink")
+
+      case None => topSpeeds.print().name("print-sink")
     }
 
     env.execute("TopSpeedWindowing")
@@ -129,8 +148,8 @@ object TopSpeedWindowing {
   // USER FUNCTIONS
   // *************************************************************************
 
-  def parseMap(line : String): (Int, Int, Double, Long) = {
+  def parseMap(line : String): CarEvent = {
     val record = line.substring(1, line.length - 1).split(",")
-    (record(0).toInt, record(1).toInt, record(2).toDouble, record(3).toLong)
+    CarEvent(record(0).toInt, record(1).toInt, record(2).toDouble, record(3).toLong)
   }
 }
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala
index 07efd90..79ed824 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala
@@ -18,10 +18,20 @@
 
 package org.apache.flink.streaming.scala.examples.windowing
 
-import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.api.common.eventtime.WatermarkStrategy
+import org.apache.flink.api.common.serialization.SimpleStringEncoder
 import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.configuration.MemorySize
+import org.apache.flink.connector.file.sink.FileSink
+import org.apache.flink.connector.file.src.FileSource
+import org.apache.flink.connector.file.src.reader.TextLineFormat
+import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.examples.wordcount.util.WordCountData
+import org.apache.flink.streaming.scala.examples.wordcount.WordCount.Tokenizer
+import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
+
+import java.time.Duration
 
 /**
  * Implements a windowed version of the streaming "WordCount" program.
@@ -50,51 +60,93 @@ import org.apache.flink.streaming.examples.wordcount.util.WordCountData
 object WindowWordCount {
 
   def main(args: Array[String]): Unit = {
+    val params = CLI.fromArgs(args)
 
-    val params = ParameterTool.fromArgs(args)
-
-    // set up the execution environment
+    // Create the execution environment. This is the main entrypoint
+    // to building a Flink application.
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
-    // get input data
-    val text =
-    if (params.has("input")) {
-      // read the text file from given input path
-      env.readTextFile(params.get("input"))
-    } else {
-      println("Executing WindowWordCount example with default input data set.")
-      println("Use --input to specify file input.")
-      // get default test text data
-      env.fromElements(WordCountData.WORDS: _*)
-    }
+    // Apache Flink’s unified approach to stream and batch processing means that a DataStream
+    // application executed over bounded input will produce the same final results regardless
+    // of the configured execution mode. It is important to note what final means here: a job
+    // executing in STREAMING mode might produce incremental updates (think upserts in
+    // a database) while in BATCH mode, it would only produce one final result at the end. The
+    // final result will be the same if interpreted correctly, but getting there can be
+    // different.
+    //
+    // The “classic” execution behavior of the DataStream API is called STREAMING execution
+    // mode. Applications should use streaming execution for unbounded jobs that require
+    // continuous incremental processing and are expected to stay online indefinitely.
+    //
+    // By enabling BATCH execution, we allow Flink to apply additional optimizations that we
+    // can only do when we know that our input is bounded. For example, different
+    // join/aggregation strategies can be used, in addition to a different shuffle
+    // implementation that allows more efficient task scheduling and failure recovery behavior.
+    //
+    // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if all sources
+    // are bounded and otherwise STREAMING.
+    env.setRuntimeMode(params.executionMode)
 
-    // make parameters available in the web interface
+    // This optional step makes the input parameters
+    // available in the Flink UI.
     env.getConfig.setGlobalJobParameters(params)
 
-    val windowSize = params.getInt("window", 250)
-    val slideSize = params.getInt("slide", 150)
+    val text = params.input match {
+      case Some(input) =>
+        // Create a new file source that will read files from a given set of directories.
+        // Each file will be processed as plain text and split based on newlines.
+        val builder = FileSource.forRecordStreamFormat(new TextLineFormat, input:_*)
+        params.discoveryInterval.foreach { duration =>
+          // If a discovery interval is provided, the source will
+          // continuously watch the given directories for new files.
+          builder.monitorContinuously(duration)
+        }
+        env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input")
+      case None =>
+        env.fromElements(WordCountData.WORDS:_*).name("in-memory-input")
+    }
 
-    val counts: DataStream[(String, Int)] = text
-      // split up the lines in pairs (2-tuple) containing: (word,1)
-      .flatMap(_.toLowerCase.split("\\W+"))
-      .filter(_.nonEmpty)
-      .map((_, 1))
-      .keyBy(_._1)
-      // create windows of windowSize records slided every slideSize records
-      .countWindow(windowSize, slideSize)
-      // group by the tuple field "0" and sum up tuple field "1"
-      .sum(1)
+    val windowSize = params.getInt("window").getOrElse(250)
+    val slideSize = params.getInt("slide").getOrElse(150)
 
-    // emit result
-    if (params.has("output")) {
-      counts.writeAsText(params.get("output"))
-    } else {
-      println("Printing result to stdout. Use --output to specify output path.")
-      counts.print()
+    val counts =
+      // The text lines read from the source are split into words
+      // using a user-defined function. The tokenizer, implemented below,
+      // will output each words as a (2-tuple) containing (word, 1)
+      text.flatMap(new Tokenizer)
+        .name("tokenizer")
+        // keyBy groups tuples based on the "_1" field, the word.
+        // Using a keyBy allows performing aggregations and other
+        // stateful transformations over data on a per-key basis.
+        // This is similar to a GROUP BY clause in a SQL query.
+        .keyBy(_._1)
+        // create windows of windowSize records slided every slideSize records
+        .countWindow(windowSize, slideSize)
+        // For each key, we perform a simple sum of the "1" field, the count.
+        // If the input data set is bounded, sum will output a final count for
+        // each word. If it is unbounded, it will continuously output updates
+        // each time it sees a new instance of each word in the stream.
+        .sum(1)
+        .name("counter")
+
+    params.output match {
+      case Some(output) =>
+        // Given an output directory, Flink will write the results to a file
+        // using a simple string encoding. In a production environment, this might
+        // be something more structured like CSV, Avro, JSON, or Parquet.
+        counts.sinkTo(FileSink.forRowFormat[(String, Int)](output, new SimpleStringEncoder())
+          .withRollingPolicy(DefaultRollingPolicy.builder()
+            .withMaxPartSize(MemorySize.ofMebiBytes(1))
+            .withRolloverInterval(Duration.ofSeconds(10))
+            .build())
+          .build())
+          .name("file-sink")
+
+      case None => counts.print().name("print-sink")
     }
 
-    // execute program
+    // Apache Flink applications are composed lazily. Calling execute
+    // submits the Job and begins processing.
     env.execute("WindowWordCount")
   }
-
 }
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/util/CarSource.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/util/CarSource.scala
new file mode 100644
index 0000000..8f1d4c4
--- /dev/null
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/util/CarSource.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.scala.examples.windowing.util
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.examples.windowing.util.{CarSource => JCarSource}
+import org.apache.flink.streaming.scala.examples.windowing.TopSpeedWindowing.CarEvent
+import org.apache.flink.api.java.tuple.{Tuple4 => JTuple4}
+
+import java.lang.{Integer => JInt, Double => JDouble, Long => JLong}
+
+/** A simple in-memory source. */
+object CarSource {
+  def apply(cars: Int): CarSource =
+    new CarSource(JCarSource.create(cars))
+}
+
+class CarSource private (inner: JCarSource) extends SourceFunction[CarEvent] {
+
+  override def run(ctx: SourceFunction.SourceContext[CarEvent]): Unit = {
+    inner.run(new WrappingCollector(ctx))
+  }
+
+  override def cancel(): Unit = inner.cancel()
+}
+
+private class WrappingCollector(ctx: SourceFunction.SourceContext[CarEvent])
+  extends SourceFunction.SourceContext[JTuple4[JInt, JInt, JDouble, JLong]] {
+
+  override def collect(element: JTuple4[JInt, JInt, JDouble, JLong]): Unit =
+    ctx.collect(CarEvent(element.f0, element.f1, element.f2, element.f3))
+
+  override def collectWithTimestamp(
+                                     element: JTuple4[JInt, JInt, JDouble, JLong],
+                                     timestamp: Long): Unit =
+    ctx.collectWithTimestamp(CarEvent(element.f0, element.f1, element.f2, element.f3), timestamp)
+
+  override def emitWatermark(mark: Watermark): Unit = ctx.emitWatermark(mark)
+
+  override def markAsTemporarilyIdle(): Unit = ctx.markAsTemporarilyIdle()
+
+  override def getCheckpointLock: AnyRef = ctx.getCheckpointLock
+
+  override def close(): Unit = ctx.close()
+}
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
index e45994f8..80776b9 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/StreamingExamplesITCase.java
@@ -31,7 +31,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.examples.iteration.util.IterateExampleData;
 import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData;
-import org.apache.flink.streaming.examples.windowing.util.SessionWindowingData;
 import org.apache.flink.streaming.test.examples.join.WindowJoinData;
 import org.apache.flink.test.testdata.WordCountData;
 import org.apache.flink.test.util.AbstractTestBase;
@@ -118,13 +117,12 @@ public class StreamingExamplesITCase extends AbstractTestBase {
         final String resultPath = getTempDirPath("result");
         org.apache.flink.streaming.examples.windowing.SessionWindowing.main(
                 new String[] {"--output", resultPath});
-        compareResultsByLinesInMemory(SessionWindowingData.EXPECTED, resultPath);
     }
 
     @Test
     public void testWindowWordCount() throws Exception {
-        final String windowSize = "250";
-        final String slideSize = "150";
+        final String windowSize = "25";
+        final String slideSize = "15";
         final String textPath = createTempFile("text.txt", WordCountData.TEXT);
         final String resultPath = getTempDirPath("result");
 
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
index f980a1a..72dd508 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/windowing/TopSpeedWindowingExampleITCase.java
@@ -53,7 +53,14 @@ public class TopSpeedWindowingExampleITCase extends TestLogger {
         final String resultPath = temporaryFolder.newFolder().toURI().toString();
 
         TopSpeedWindowing.main(
-                new String[] {"--input", inputFile.getAbsolutePath(), "--output", resultPath});
+                new String[] {
+                    "--input",
+                    inputFile.getAbsolutePath(),
+                    "--output",
+                    resultPath,
+                    "--execution-mode",
+                    "AUTOMATIC"
+                });
 
         compareResultsByLinesInMemory(TopSpeedWindowingExampleData.TOP_SPEEDS, resultPath);
     }
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java
index 1e33899..0c993c7 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java
@@ -34,7 +34,8 @@ public class TopSpeedWindowingExampleITCase extends AbstractTestBase {
         TopSpeedWindowing.main(
                 new String[] {
                     "--input", textPath,
-                    "--output", resultPath
+                    "--output", resultPath,
+                    "--execution-mode", "AUTOMATIC"
                 });
 
         compareResultsByLinesInMemory(
diff --git a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
index 4d6fe8b..d55405f 100644
--- a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
+++ b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
@@ -26,7 +26,6 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.examples.iteration.util.IterateExampleData
 import org.apache.flink.streaming.examples.twitter.util.TwitterExampleData
-import org.apache.flink.streaming.examples.windowing.util.SessionWindowingData
 import org.apache.flink.streaming.scala.examples.iteration.IterateExample
 import org.apache.flink.streaming.scala.examples.join.WindowJoin
 import org.apache.flink.streaming.scala.examples.join.WindowJoin.{Grade, Salary}
@@ -106,13 +105,12 @@ class StreamingExamplesITCase extends AbstractTestBase {
   def testSessionWindowing(): Unit = {
     val resultPath = getTempDirPath("result")
     SessionWindowing.main(Array("--output", resultPath))
-    TestBaseUtils.compareResultsByLinesInMemory(SessionWindowingData.EXPECTED, resultPath)
   }
 
   @Test
   def testWindowWordCount(): Unit = {
-    val windowSize = "250"
-    val slideSize = "150"
+    val windowSize = "25"
+    val slideSize = "15"
     val textPath = createTempFile("text.txt", WordCountData.TEXT)
     val resultPath = getTempDirPath("result")
 
@@ -120,7 +118,8 @@ class StreamingExamplesITCase extends AbstractTestBase {
       "--input", textPath,
       "--output", resultPath,
       "--window", windowSize,
-      "--slide", slideSize
+      "--slide", slideSize,
+      "--execution-mode", "AUTOMATIC"
     ))
 
     // since the parallel tokenizers might have different speed