You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ma...@apache.org on 2022/11/08 13:33:23 UTC

[flink] branch master updated: [FLINK-29740][Scala] Mark Scala APIs as `@Deprecated`. This closes #21176

This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ac290a518a [FLINK-29740][Scala] Mark Scala APIs as `@Deprecated`. This closes #21176
5ac290a518a is described below

commit 5ac290a518a57030bf5b773ca69fbbdda4e0088f
Author: MartijnVisser <ma...@apache.org>
AuthorDate: Tue Nov 8 14:33:14 2022 +0100

    [FLINK-29740][Scala] Mark Scala APIs as `@Deprecated`. This closes #21176
    
    * [FLINK-29740][Scala] Remove Scala examples, quickstarts and walkthroughs
    
    * [FLINK-29740][Scala] Mark Scala package and Scala ExecutionEnvironment as @Deprecated
    
    * [FLINK-29740][Scala] Remove Scala E2E tests/walkthroughs
    
    - Remove Scala tests that relied on Scala examples
    - Remove Scala test from `SocketWindowWordCount`
    - Remove Scala example testing from CI
    
    * [FLINK-29740][Scala] Add deprecation hint to the documentation on the DataStream Overview Page, Table Concepts & Common API page and the DataStream Scala Extensions Page
    
    * [FLINK-29740][Scala] Remove Scala quickstart/walkthrough from documentation
    
    * [FLINK-29740][Scala] Add Scala deprecation warning to project configuration overview
---
 docs/content.zh/docs/dev/configuration/overview.md |   6 +
 docs/content.zh/docs/dev/datastream/overview.md    |   6 +
 .../docs/dev/datastream/scala_api_extensions.md    |   6 +
 docs/content.zh/docs/dev/table/common.md           |   6 +
 docs/content.zh/docs/learn-flink/datastream_api.md |   8 +-
 docs/content.zh/docs/try-flink/datastream.md       | 334 +--------------------
 docs/content.zh/docs/try-flink/table_api.md        |   2 +-
 docs/content/docs/dev/configuration/overview.md    |   6 +
 docs/content/docs/dev/datastream/overview.md       |   6 +
 .../docs/dev/datastream/scala_api_extensions.md    |   6 +
 docs/content/docs/dev/table/common.md              |   6 +
 docs/content/docs/learn-flink/datastream_api.md    |   8 +-
 docs/content/docs/try-flink/datastream.md          | 321 +-------------------
 docs/content/docs/try-flink/table_api.md           |   4 +-
 .../flink/quickstarts/test/QuickstartExample.scala |  58 ----
 flink-end-to-end-tests/run-nightly-tests.sh        |   2 -
 .../flink/examples/scala/clustering/KMeans.scala   | 222 --------------
 .../examples/scala/graph/ConnectedComponents.scala | 153 ----------
 .../flink/examples/scala/graph/DeltaPageRank.scala | 108 -------
 .../flink/examples/scala/graph/EnumTriangles.scala | 150 ---------
 .../flink/examples/scala/graph/PageRankBasic.scala | 187 ------------
 .../scala/graph/TransitiveClosureNaive.scala       |  96 ------
 .../flink/examples/scala/misc/PiEstimation.scala   |  53 ----
 .../examples/scala/relational/TPCHQuery10.scala    | 189 ------------
 .../examples/scala/relational/TPCHQuery3.scala     | 168 -----------
 .../examples/scala/relational/WebLogAnalysis.scala | 194 ------------
 .../flink/examples/scala/wordcount/WordCount.scala |  79 -----
 .../scala/examples/async/AsyncClient.scala         |  41 ---
 .../scala/examples/async/AsyncIOExample.scala      |  86 ------
 .../scala/examples/iteration/IterateExample.scala  | 152 ----------
 .../streaming/scala/examples/join/WindowJoin.scala |  94 ------
 .../scala/examples/join/WindowJoinSampleData.scala |  69 -----
 .../examples/socket/SocketWindowWordCount.scala    |  82 -----
 .../GroupedProcessingTimeWindowExample.scala       |  79 -----
 .../examples/windowing/SessionWindowing.scala      | 109 -------
 .../examples/windowing/TopSpeedWindowing.scala     | 161 ----------
 .../scala/examples/windowing/WindowWordCount.scala | 156 ----------
 .../scala/examples/windowing/util/CarSource.scala  |  61 ----
 .../scala/examples/wordcount/WordCount.scala       | 176 -----------
 .../scala/examples/wordcount/util/CLI.scala        |  64 ----
 .../windowing/TopSpeedWindowingExampleITCase.java  |  46 ---
 .../test/socket/SocketWindowWordCountITCase.java   |  42 ---
 .../scala/examples/StreamingExamplesITCase.scala   | 147 ---------
 flink-examples/flink-examples-table/pom.xml        |   5 -
 .../scala/basics/GettingStartedExample.scala       | 208 -------------
 .../examples/scala/basics/StreamSQLExample.scala   |  85 ------
 .../examples/scala/basics/StreamTableExample.scala |  70 -----
 .../scala/basics/WordCountSQLExample.scala         |  49 ---
 .../scala/basics/GettingStartedExampleITCase.java  |  38 ---
 .../scala/basics/StreamSQLExampleITCase.java       |  39 ---
 .../scala/basics/WordCountSQLExampleITCase.java    |  38 ---
 flink-quickstart/flink-quickstart-scala/pom.xml    |  54 ----
 .../META-INF/maven/archetype-metadata.xml          |  36 ---
 .../src/main/resources/archetype-resources/pom.xml | 254 ----------------
 .../src/main/resources/log4j2.properties           |  25 --
 .../src/main/scala/DataStreamJob.scala             |  64 ----
 .../projects/testArtifact/archetype.properties     |  21 --
 .../test/resources/projects/testArtifact/goal.txt  |   1 -
 flink-quickstart/pom.xml                           |   1 -
 .../flink/api/scala/ExecutionEnvironment.scala     |  10 +
 .../scala/org/apache/flink/api/scala/package.scala |   9 +
 .../example/scala/ConnectedComponentsITCase.java   |  74 -----
 .../test/example/scala/EnumTriangleITCase.java     |  53 ----
 .../flink/test/example/scala/PageRankITCase.java   | 103 -------
 .../example/scala/TransitiveClosureITCase.java     |  69 -----
 .../test/example/scala/WebLogAnalysisITCase.java   |  59 ----
 .../flink/test/example/scala/WordCountITCase.java  |  53 ----
 .../flink-walkthrough-datastream-scala/pom.xml     |  37 ---
 .../META-INF/maven/archetype-metadata.xml          |  36 ---
 .../src/main/resources/archetype-resources/pom.xml | 249 ---------------
 .../src/main/resources/log4j2.properties           |  28 --
 .../src/main/scala/FraudDetectionJob.scala         |  51 ----
 .../src/main/scala/FraudDetector.scala             |  49 ---
 flink-walkthroughs/pom.xml                         |   1 -
 74 files changed, 89 insertions(+), 5729 deletions(-)

diff --git a/docs/content.zh/docs/dev/configuration/overview.md b/docs/content.zh/docs/dev/configuration/overview.md
index c7c58d1edf1..410c3adc73f 100644
--- a/docs/content.zh/docs/dev/configuration/overview.md
+++ b/docs/content.zh/docs/dev/configuration/overview.md
@@ -57,6 +57,12 @@ under the License.
 
 <a name="maven-command"></a>
 
+{{< hint warning >}}
+All Flink Scala APIs are deprecated and will be removed in a future Flink version version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API.
+
+See <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support">FLIP-265 Deprecate and remove Scala API support</a>
+{{< /hint >}}
+
 ### Maven 命令
 ```bash
 $ mvn archetype:generate                \
diff --git a/docs/content.zh/docs/dev/datastream/overview.md b/docs/content.zh/docs/dev/datastream/overview.md
index 2bf572edb45..4ca56dc0584 100644
--- a/docs/content.zh/docs/dev/datastream/overview.md
+++ b/docs/content.zh/docs/dev/datastream/overview.md
@@ -61,6 +61,12 @@ Flink 程序看起来像一个转换 `DataStream` 的常规程序。每个程序
 4. 指定计算结果的存储位置;
 5. 触发程序执行。
 
+{{< hint warning >}}
+All Flink Scala APIs are deprecated and will be removed in a future Flink version version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API.
+
+See <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support">FLIP-265 Deprecate and remove Scala API support</a>
+{{< /hint >}}
+
 {{< tabs "fa68701c-59e8-4509-858e-3e8a123eeacf" >}}
 {{< tab "Java" >}}
 
diff --git a/docs/content.zh/docs/dev/datastream/scala_api_extensions.md b/docs/content.zh/docs/dev/datastream/scala_api_extensions.md
index 50ffefd0833..46d9b2f66de 100644
--- a/docs/content.zh/docs/dev/datastream/scala_api_extensions.md
+++ b/docs/content.zh/docs/dev/datastream/scala_api_extensions.md
@@ -25,6 +25,12 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+{{< hint warning >}}
+All Flink Scala APIs are deprecated and will be removed in a future Flink version version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API.
+
+See <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support">FLIP-265 Deprecate and remove Scala API support</a>
+{{< /hint >}}
+
 # Scala API 扩展
 
 为了在 Scala 和 Java API 之间保持大致相同的使用体验,在批处理和流处理的标准 API 中省略了一些允许 Scala 高级表达的特性。
diff --git a/docs/content.zh/docs/dev/table/common.md b/docs/content.zh/docs/dev/table/common.md
index ef9538ceaef..f45b243ee39 100644
--- a/docs/content.zh/docs/dev/table/common.md
+++ b/docs/content.zh/docs/dev/table/common.md
@@ -37,6 +37,12 @@ Table API 和 SQL 程序的结构
 
 所有用于批处理和流处理的 Table API 和 SQL 程序都遵循相同的模式。下面的代码示例展示了 Table API 和 SQL 程序的通用结构。
 
+{{< hint warning >}}
+All Flink Scala APIs are deprecated and will be removed in a future Flink version version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API.
+
+See <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support">FLIP-265 Deprecate and remove Scala API support</a>
+{{< /hint >}}
+
 {{< tabs "62be8916-7ab4-4831-bd19-05c591181835" >}}
 {{< tab "Java" >}}
 ```java
diff --git a/docs/content.zh/docs/learn-flink/datastream_api.md b/docs/content.zh/docs/learn-flink/datastream_api.md
index 0a2ee3f8ece..4b9b29456d3 100644
--- a/docs/content.zh/docs/learn-flink/datastream_api.md
+++ b/docs/content.zh/docs/learn-flink/datastream_api.md
@@ -28,7 +28,7 @@ under the License.
 
 ## 什么能被转化成流?
 
-Flink 的 Java 和 Scala DataStream API 可以将任何可序列化的对象转化为流。Flink  自带的序列化器有
+Flink 的 Java DataStream API 可以将任何可序列化的对象转化为流。Flink  自带的序列化器有
 
 - 基本类型,即 String、Long、Integer、Boolean、Array
 - 复合类型:Tuples、POJOs 和 Scala case classes
@@ -80,6 +80,12 @@ Flink 的序列化器[支持的 POJO 类型数据结构升级]({{< ref "docs/dev
 
 如果你了解 Scala,那一定知道 tuple 和 case class。
 
+{{< hint warning >}}
+All Flink Scala APIs are deprecated and will be removed in a future Flink version version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API.
+
+See <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support">FLIP-265 Deprecate and remove Scala API support</a>
+{{< /hint >}}
+
 {{< top >}}
 
 ## 一个完整的示例
diff --git a/docs/content.zh/docs/try-flink/datastream.md b/docs/content.zh/docs/try-flink/datastream.md
index 09eddc5d027..8d0a0fa3612 100644
--- a/docs/content.zh/docs/try-flink/datastream.md
+++ b/docs/content.zh/docs/try-flink/datastream.md
@@ -49,7 +49,7 @@ Flink 支持对状态和时间的细粒度控制,以此来实现复杂的事
 
 ## 准备条件
 
-这个代码练习假定你对 Java 或 Scala 有一定的了解,当然,如果你之前使用的是其他开发语言,你也应该能够跟随本教程进行学习。
+这个代码练习假定你对 Java 有一定的了解,当然,如果你之前使用的是其他开发语言,你也应该能够跟随本教程进行学习。
 
 ### 在 IDE 中运行
 
@@ -94,19 +94,6 @@ $ mvn archetype:generate \
     -DinteractiveMode=false
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```bash
-$ mvn archetype:generate \
-    -DarchetypeGroupId=org.apache.flink \
-    -DarchetypeArtifactId=flink-walkthrough-datastream-scala \
-    -DarchetypeVersion={{< version >}} \
-    -DgroupId=frauddetection \
-    -DartifactId=frauddetection \
-    -Dversion=0.1 \
-    -Dpackage=spendreport \
-    -DinteractiveMode=false
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 {{< unstable >}}
@@ -137,7 +124,7 @@ Maven 3.0 及更高版本,不再支持通过命令行指定仓库(-Darchetyp
 
 你可以根据自己的情况修改 `groupId`、 `artifactId` 和 `package`。通过这三个参数,
 Maven 将会创建一个名为 `frauddetection` 的文件夹,包含了所有依赖的整个工程项目将会位于该文件夹下。
-将工程目录导入到你的开发环境之后,你可以找到 `FraudDetectionJob.java` (或 `FraudDetectionJob.scala`) 代码文件,文件中的代码如下所示。你可以在 IDE 中直接运行这个文件。
+将工程目录导入到你的开发环境之后,你可以找到 `FraudDetectionJob.java` 代码文件,文件中的代码如下所示。你可以在 IDE 中直接运行这个文件。
 同时,你可以试着在数据流中设置一些断点或者以 DEBUG 模式来运行程序,体验 Flink 是如何运行的。
 
 {{< tabs "812722d9-c880-464a-a39d-749e538a8612" >}}
@@ -211,81 +198,6 @@ public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert
         collector.collect(alert);
     }
 }
-```
-{{< /tab >}}
-{{< tab "Scala" >}}
-
-<a name="frauddetectionjobscala"></a>
-
-#### FraudDetectionJob.scala
-
-```scala
-package spendreport
-
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.walkthrough.common.sink.AlertSink
-import org.apache.flink.walkthrough.common.entity.Alert
-import org.apache.flink.walkthrough.common.entity.Transaction
-import org.apache.flink.walkthrough.common.source.TransactionSource
-
-object FraudDetectionJob {
-
-  @throws[Exception]
-  def main(args: Array[String]): Unit = {
-    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val transactions: DataStream[Transaction] = env
-      .addSource(new TransactionSource)
-      .name("transactions")
-
-    val alerts: DataStream[Alert] = transactions
-      .keyBy(transaction => transaction.getAccountId)
-      .process(new FraudDetector)
-      .name("fraud-detector")
-
-    alerts
-      .addSink(new AlertSink)
-      .name("send-alerts")
-
-    env.execute("Fraud Detection")
-  }
-}
-```
-
-<a name="frauddetectorscala"></a>
-
-#### FraudDetector.scala
-
-```scala
-package spendreport
-
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction
-import org.apache.flink.util.Collector
-import org.apache.flink.walkthrough.common.entity.Alert
-import org.apache.flink.walkthrough.common.entity.Transaction
-
-object FraudDetector {
-  val SMALL_AMOUNT: Double = 1.00
-  val LARGE_AMOUNT: Double = 500.00
-  val ONE_MINUTE: Long     = 60 * 1000L
-}
-
-@SerialVersionUID(1L)
-class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
-
-  @throws[Exception]
-  def processElement(
-      transaction: Transaction,
-      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
-      collector: Collector[Alert]): Unit = {
-
-    val alert = new Alert
-    alert.setId(transaction.getAccountId)
-
-    collector.collect(alert)
-  }
-}
-
 ```
 {{< /tab >}}
 {{< /tabs >}}
@@ -311,11 +223,6 @@ class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 <a name="creating-a-source"></a>
@@ -335,13 +242,6 @@ DataStream<Transaction> transactions = env
     .name("transactions");
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val transactions: DataStream[Transaction] = env
-  .addSource(new TransactionSource)
-  .name("transactions")
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 <a name="partitioning-events--detecting-fraud"></a>
@@ -364,14 +264,6 @@ DataStream<Alert> alerts = transactions
     .name("fraud-detector");
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val alerts: DataStream[Alert] = transactions
-  .keyBy(transaction => transaction.getAccountId)
-  .process(new FraudDetector)
-  .name("fraud-detector")
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 <a name="outputting-results"></a>
@@ -387,11 +279,6 @@ sink 会将 `DataStream` 写出到外部系统,例如 Apache Kafka、Cassandra
 alerts.addSink(new AlertSink());
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-alerts.addSink(new AlertSink)
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 <a name="executing-the-job"></a>
@@ -407,11 +294,6 @@ Flink 程序是懒加载的,并且只有在完全搭建好之后,才能够
 env.execute("Fraud Detection");
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-env.execute("Fraud Detection")
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 <a name="the-fraud-detector"></a>
@@ -447,31 +329,6 @@ public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert
 }
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-object FraudDetector {
-  val SMALL_AMOUNT: Double = 1.00
-  val LARGE_AMOUNT: Double = 500.00
-  val ONE_MINUTE: Long     = 60 * 1000L
-}
-
-@SerialVersionUID(1L)
-class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
-
-  @throws[Exception]
-  def processElement(
-      transaction: Transaction,
-      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
-      collector: Collector[Alert]): Unit = {
-
-    val alert = new Alert
-    alert.setId(transaction.getAccountId)
-
-    collector.collect(alert)
-  }
-}
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 <a name="writing-a-real-application-v1"></a>
@@ -528,20 +385,6 @@ public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert
     }
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-@SerialVersionUID(1L)
-class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
-
-  @transient private var flagState: ValueState[java.lang.Boolean] = _
-
-  @throws[Exception]
-  override def open(parameters: Configuration): Unit = {
-    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
-    flagState = getRuntimeContext.getState(flagDescriptor)
-  }
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 `ValueState` 是一个包装类,类似于 Java 标准库里边的 `AtomicReference` 和 `AtomicLong`。
@@ -585,36 +428,6 @@ public void processElement(
 }
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-override def processElement(
-    transaction: Transaction,
-    context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
-    collector: Collector[Alert]): Unit = {
-
-  // Get the current state for the current key
-  val lastTransactionWasSmall = flagState.value
-
-  // Check if the flag is set
-  if (lastTransactionWasSmall != null) {
-    if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
-      // Output an alert downstream
-      val alert = new Alert
-      alert.setId(transaction.getAccountId)
-
-      collector.collect(alert)
-    }
-    // Clean up our state
-    flagState.clear()
-  }
-
-  if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
-    // set the flag to true
-    flagState.update(true)
-  }
-}
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 对于每笔交易,欺诈检测器都会检查该帐户的标记状态。
@@ -663,24 +476,6 @@ public void open(Configuration parameters) {
 }
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-@SerialVersionUID(1L)
-class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
-
-  @transient private var flagState: ValueState[java.lang.Boolean] = _
-  @transient private var timerState: ValueState[java.lang.Long] = _
-
-  @throws[Exception]
-  override def open(parameters: Configuration): Unit = {
-    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
-    flagState = getRuntimeContext.getState(flagDescriptor)
-
-    val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
-    timerState = getRuntimeContext.getState(timerDescriptor)
-  }
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 `KeyedProcessFunction#processElement` 需要使用提供了定时器服务的 `Context` 来调用。
@@ -701,19 +496,6 @@ if (transaction.getAmount() < SMALL_AMOUNT) {
 }
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
-  // set the flag to true
-  flagState.update(true)
-
-  // set the timer and timer state
-  val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE
-  context.timerService.registerProcessingTimeTimer(timer)
-  timerState.update(timer)
-}
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 处理时间是本地时钟时间,这是由运行任务的服务器的系统时间来决定的。
@@ -732,18 +514,6 @@ public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
 }
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-override def onTimer(
-    timestamp: Long,
-    ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
-    out: Collector[Alert]): Unit = {
-  // remove flag after 1 minute
-  timerState.clear()
-  flagState.clear()
-}
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 最后,如果要取消定时器,你需要删除已经注册的定时器,并同时清空保存定时器的状态。
@@ -763,20 +533,6 @@ private void cleanUp(Context ctx) throws Exception {
 }
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-@throws[Exception]
-private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
-  // delete timer
-  val timer = timerState.value
-  ctx.timerService.deleteProcessingTimeTimer(timer)
-
-  // clean up all states
-  timerState.clear()
-  flagState.clear()
-}
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 这就是一个功能完备的,有状态的分布式流处理程序了。
@@ -875,92 +631,6 @@ public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert
 }
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-package spendreport
-
-import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
-import org.apache.flink.api.scala.typeutils.Types
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction
-import org.apache.flink.util.Collector
-import org.apache.flink.walkthrough.common.entity.Alert
-import org.apache.flink.walkthrough.common.entity.Transaction
-
-object FraudDetector {
-  val SMALL_AMOUNT: Double = 1.00
-  val LARGE_AMOUNT: Double = 500.00
-  val ONE_MINUTE: Long     = 60 * 1000L
-}
-
-@SerialVersionUID(1L)
-class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
-
-  @transient private var flagState: ValueState[java.lang.Boolean] = _
-  @transient private var timerState: ValueState[java.lang.Long] = _
-
-  @throws[Exception]
-  override def open(parameters: Configuration): Unit = {
-    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
-    flagState = getRuntimeContext.getState(flagDescriptor)
-
-    val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
-    timerState = getRuntimeContext.getState(timerDescriptor)
-  }
-
-  override def processElement(
-      transaction: Transaction,
-      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
-      collector: Collector[Alert]): Unit = {
-
-    // Get the current state for the current key
-    val lastTransactionWasSmall = flagState.value
-
-    // Check if the flag is set
-    if (lastTransactionWasSmall != null) {
-      if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
-        // Output an alert downstream
-        val alert = new Alert
-        alert.setId(transaction.getAccountId)
-
-        collector.collect(alert)
-      }
-      // Clean up our state
-      cleanUp(context)
-    }
-
-    if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
-      // set the flag to true
-      flagState.update(true)
-      val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE
-
-      context.timerService.registerProcessingTimeTimer(timer)
-      timerState.update(timer)
-    }
-  }
-
-  override def onTimer(
-      timestamp: Long,
-      ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
-      out: Collector[Alert]): Unit = {
-    // remove flag after 1 minute
-    timerState.clear()
-    flagState.clear()
-  }
-
-  @throws[Exception]
-  private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
-    // delete timer
-    val timer = timerState.value
-    ctx.timerService.deleteProcessingTimeTimer(timer)
-
-    // clean up all states
-    timerState.clear()
-    flagState.clear()
-  }
-}
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 <a name="expected-output"></a>
diff --git a/docs/content.zh/docs/try-flink/table_api.md b/docs/content.zh/docs/try-flink/table_api.md
index 9456de8c4db..152625f3c6f 100644
--- a/docs/content.zh/docs/try-flink/table_api.md
+++ b/docs/content.zh/docs/try-flink/table_api.md
@@ -38,7 +38,7 @@ Flink 的 Table API 可以简化数据分析、构建数据流水线以及 ETL 
 
 ## 准备条件
 
-我们默认你对 Java 或者 Scala 有一定了解,当然如果你使用的是其他编程语言,也可以继续学习。 
+我们默认你对 Java 有一定了解,当然如果你使用的是其他编程语言,也可以继续学习。 
 同时也默认你了解基本的关系型概念,例如 SELECT 、GROUP BY 等语句。
 
 ## 困难求助
diff --git a/docs/content/docs/dev/configuration/overview.md b/docs/content/docs/dev/configuration/overview.md
index 0ac9dbadf3c..4a0f431aec7 100644
--- a/docs/content/docs/dev/configuration/overview.md
+++ b/docs/content/docs/dev/configuration/overview.md
@@ -59,6 +59,12 @@ to create a Flink project.
 You can create a project based on an [Archetype](https://maven.apache.org/guides/introduction/introduction-to-archetypes.html)
 with the Maven command below or use the provided quickstart bash script.
 
+{{< hint warning >}}
+All Flink Scala APIs are deprecated and will be removed in a future Flink version version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API.
+
+See <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support">FLIP-265 Deprecate and remove Scala API support</a>
+{{< /hint >}}
+
 ### Maven command
 ```bash
 $ mvn archetype:generate                \
diff --git a/docs/content/docs/dev/datastream/overview.md b/docs/content/docs/dev/datastream/overview.md
index 6b9d8f80713..d89d701f826 100644
--- a/docs/content/docs/dev/datastream/overview.md
+++ b/docs/content/docs/dev/datastream/overview.md
@@ -72,6 +72,12 @@ program consists of the same basic parts:
 4. Specify where to put the results of your computations,
 5. Trigger the program execution
 
+{{< hint warning >}}
+All Flink Scala APIs are deprecated and will be removed in a future Flink version version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API.
+
+See <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support">FLIP-265 Deprecate and remove Scala API support</a>
+{{< /hint >}}
+
 {{< tabs "fa68701c-59e8-4509-858e-3e8a123eeacf" >}}
 {{< tab "Java" >}}
 
diff --git a/docs/content/docs/dev/datastream/scala_api_extensions.md b/docs/content/docs/dev/datastream/scala_api_extensions.md
index d13d676f040..e0656a39011 100644
--- a/docs/content/docs/dev/datastream/scala_api_extensions.md
+++ b/docs/content/docs/dev/datastream/scala_api_extensions.md
@@ -25,6 +25,12 @@ specific language governing permissions and limitations
 under the License.
 -->
 
+{{< hint warning >}}
+All Flink Scala APIs are deprecated and will be removed in a future Flink version version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API.
+
+See <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support">FLIP-265 Deprecate and remove Scala API support</a>
+{{< /hint >}}
+
 # Scala API Extensions
 
 In order to keep a fair amount of consistency between the Scala and Java APIs, some
diff --git a/docs/content/docs/dev/table/common.md b/docs/content/docs/dev/table/common.md
index 1c9b5441afe..c594c29bdfd 100644
--- a/docs/content/docs/dev/table/common.md
+++ b/docs/content/docs/dev/table/common.md
@@ -35,6 +35,12 @@ Structure of Table API and SQL Programs
 
 The following code example shows the common structure of Table API and SQL programs.
 
+{{< hint warning >}}
+All Flink Scala APIs are deprecated and will be removed in a future Flink version version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API.
+
+See <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support">FLIP-265 Deprecate and remove Scala API support</a>
+{{< /hint >}}
+
 {{< tabs "0727d1e7-3f22-4eba-a25f-6a554b6a1359" >}}
 {{< tab "Java" >}}
 ```java
diff --git a/docs/content/docs/learn-flink/datastream_api.md b/docs/content/docs/learn-flink/datastream_api.md
index e1091c3f3d0..909cbe7a8f3 100644
--- a/docs/content/docs/learn-flink/datastream_api.md
+++ b/docs/content/docs/learn-flink/datastream_api.md
@@ -29,7 +29,7 @@ to get started writing streaming applications.
 
 ## What can be Streamed?
 
-Flink's DataStream APIs for Java and Scala will let you stream anything they can serialize. Flink's
+Flink's DataStream APIs will let you stream anything they can serialize. Flink's
 own serializer is used for
 
 - basic types, i.e., String, Long, Integer, Boolean, Array
@@ -85,6 +85,12 @@ Flink's serializer [supports schema evolution for POJO types]({{< ref "docs/dev/
 
 These work just as you'd expect.
 
+{{< hint warning >}}
+All Flink Scala APIs are deprecated and will be removed in a future Flink version version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API.
+
+See <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support">FLIP-265 Deprecate and remove Scala API support</a>
+{{< /hint >}}
+
 {{< top >}}
 
 ## A Complete Example
diff --git a/docs/content/docs/try-flink/datastream.md b/docs/content/docs/try-flink/datastream.md
index 16e66dc8427..3f41750011d 100644
--- a/docs/content/docs/try-flink/datastream.md
+++ b/docs/content/docs/try-flink/datastream.md
@@ -44,7 +44,7 @@ Using a simple set of rules, you will see how Flink allows us to implement advan
 
 ## Prerequisites
 
-This walkthrough assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you are coming from a different programming language.
+This walkthrough assumes that you have some familiarity with Java, but you should be able to follow along even if you are coming from a different programming language.
 
 ### Running in an IDE
 
@@ -81,19 +81,6 @@ $ mvn archetype:generate \
     -DinteractiveMode=false
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```bash
-$ mvn archetype:generate \
-    -DarchetypeGroupId=org.apache.flink \
-    -DarchetypeArtifactId=flink-walkthrough-datastream-scala \
-    -DarchetypeVersion={{< version >}} \
-    -DgroupId=frauddetection \
-    -DartifactId=frauddetection \
-    -Dversion=0.1 \
-    -Dpackage=spendreport \
-    -DinteractiveMode=false
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 {{< unstable >}}
@@ -124,7 +111,7 @@ If you wish to use the snapshot repository, you need to add a repository entry t
 
 You can edit the `groupId`, `artifactId` and `package` if you like. With the above parameters,
 Maven will create a folder named `frauddetection` that contains a project with all the dependencies to complete this tutorial.
-After importing the project into your editor, you can find a file `FraudDetectionJob.java` (or `FraudDetectionJob.scala`) with the following code which you can run directly inside your IDE.
+After importing the project into your editor, you can find a file `FraudDetectionJob.java` with the following code which you can run directly inside your IDE.
 Try setting break points through out the data stream and run the code in DEBUG mode to get a feeling for how everything works.
 
 {{< tabs "start" >}}
@@ -194,73 +181,6 @@ public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert
 }
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-#### FraudDetectionJob.scala
-```scala
-package spendreport
-
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.walkthrough.common.sink.AlertSink
-import org.apache.flink.walkthrough.common.entity.Alert
-import org.apache.flink.walkthrough.common.entity.Transaction
-import org.apache.flink.walkthrough.common.source.TransactionSource
-
-object FraudDetectionJob {
-
-  @throws[Exception]
-  def main(args: Array[String]): Unit = {
-    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val transactions: DataStream[Transaction] = env
-      .addSource(new TransactionSource)
-      .name("transactions")
-
-    val alerts: DataStream[Alert] = transactions
-      .keyBy(transaction => transaction.getAccountId)
-      .process(new FraudDetector)
-      .name("fraud-detector")
-
-    alerts
-      .addSink(new AlertSink)
-      .name("send-alerts")
-
-    env.execute("Fraud Detection")
-  }
-}
-```
-
-#### FraudDetector.scala
-```scala
-package spendreport
-
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction
-import org.apache.flink.util.Collector
-import org.apache.flink.walkthrough.common.entity.Alert
-import org.apache.flink.walkthrough.common.entity.Transaction
-
-object FraudDetector {
-  val SMALL_AMOUNT: Double = 1.00
-  val LARGE_AMOUNT: Double = 500.00
-  val ONE_MINUTE: Long     = 60 * 1000L
-}
-
-@SerialVersionUID(1L)
-class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
-
-  @throws[Exception]
-  def processElement(
-      transaction: Transaction,
-      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
-      collector: Collector[Alert]): Unit = {
-
-    val alert = new Alert
-    alert.setId(transaction.getAccountId)
-
-    collector.collect(alert)
-  }
-}
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 ## Breaking Down the Code
@@ -280,11 +200,6 @@ The execution environment is how you set properties for your Job, create your so
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 
@@ -303,13 +218,6 @@ DataStream<Transaction> transactions = env
     .name("transactions");
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val transactions: DataStream[Transaction] = env
-  .addSource(new TransactionSource)
-  .name("transactions")
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 #### Partitioning Events & Detecting Fraud
@@ -329,14 +237,6 @@ DataStream<Alert> alerts = transactions
     .name("fraud-detector");
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-val alerts: DataStream[Alert] = transactions
-  .keyBy(transaction => transaction.getAccountId)
-  .process(new FraudDetector)
-  .name("fraud-detector")
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 #### Outputting Results
@@ -350,11 +250,6 @@ The `AlertSink` logs each `Alert` record with log level **INFO**, instead of wri
 alerts.addSink(new AlertSink());
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-alerts.addSink(new AlertSink)
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 #### The Fraud Detector
@@ -388,31 +283,6 @@ public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert
 }
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-object FraudDetector {
-  val SMALL_AMOUNT: Double = 1.00
-  val LARGE_AMOUNT: Double = 500.00
-  val ONE_MINUTE: Long     = 60 * 1000L
-}
-
-@SerialVersionUID(1L)
-class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
-
-  @throws[Exception]
-  def processElement(
-      transaction: Transaction,
-      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
-      collector: Collector[Alert]): Unit = {
-
-    val alert = new Alert
-    alert.setId(transaction.getAccountId)
-
-    collector.collect(alert)
-  }
-}
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 ## Writing a Real Application (v1)
@@ -463,20 +333,6 @@ public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert
     }
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-@SerialVersionUID(1L)
-class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
-
-  @transient private var flagState: ValueState[java.lang.Boolean] = _
-
-  @throws[Exception]
-  override def open(parameters: Configuration): Unit = {
-    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
-    flagState = getRuntimeContext.getState(flagDescriptor)
-  }
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 `ValueState` is a wrapper class, similar to `AtomicReference` or `AtomicLong` in the Java standard library.
@@ -520,36 +376,6 @@ public void processElement(
 }
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-override def processElement(
-    transaction: Transaction,
-    context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
-    collector: Collector[Alert]): Unit = {
-
-  // Get the current state for the current key
-  val lastTransactionWasSmall = flagState.value
-
-  // Check if the flag is set
-  if (lastTransactionWasSmall != null) {
-    if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
-      // Output an alert downstream
-      val alert = new Alert
-      alert.setId(transaction.getAccountId)
-
-      collector.collect(alert)
-    }
-    // Clean up our state
-    flagState.clear()
-  }
-
-  if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
-    // set the flag to true
-    flagState.update(true)
-  }
-}
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 For every transaction, the fraud detector checks the state of the flag for that account.
@@ -598,24 +424,6 @@ public void open(Configuration parameters) {
 }
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-@SerialVersionUID(1L)
-class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
-
-  @transient private var flagState: ValueState[java.lang.Boolean] = _
-  @transient private var timerState: ValueState[java.lang.Long] = _
-
-  @throws[Exception]
-  override def open(parameters: Configuration): Unit = {
-    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
-    flagState = getRuntimeContext.getState(flagDescriptor)
-
-    val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
-    timerState = getRuntimeContext.getState(timerDescriptor)
-  }
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 `KeyedProcessFunction#processElement` is called with a `Context` that contains a timer service.
@@ -636,19 +444,6 @@ if (transaction.getAmount() < SMALL_AMOUNT) {
 }
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
-  // set the flag to true
-  flagState.update(true)
-
-  // set the timer and timer state
-  val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE
-  context.timerService.registerProcessingTimeTimer(timer)
-  timerState.update(timer)
-}
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 Processing time is wall clock time, and is determined by the system clock of the machine running the operator. 
@@ -666,18 +461,6 @@ public void onTimer(long timestamp, OnTimerContext ctx, Collector<Alert> out) {
 }
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-override def onTimer(
-    timestamp: Long,
-    ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
-    out: Collector[Alert]): Unit = {
-  // remove flag after 1 minute
-  timerState.clear()
-  flagState.clear()
-}
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 Finally, to cancel the timer, you need to delete the registered timer and delete the timer state.
@@ -697,20 +480,6 @@ private void cleanUp(Context ctx) throws Exception {
 }
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-@throws[Exception]
-private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
-  // delete timer
-  val timer = timerState.value
-  ctx.timerService.deleteProcessingTimeTimer(timer)
-
-  // clean up all states
-  timerState.clear()
-  flagState.clear()
-}
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 And that's it, a fully functional, stateful, distributed streaming application!
@@ -805,90 +574,6 @@ public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert
 }
 ```
 {{< /tab >}}
-{{< tab "Scala" >}}
-```scala
-import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
-import org.apache.flink.api.scala.typeutils.Types
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction
-import org.apache.flink.util.Collector
-import org.apache.flink.walkthrough.common.entity.Alert
-import org.apache.flink.walkthrough.common.entity.Transaction
-
-object FraudDetector {
-  val SMALL_AMOUNT: Double = 1.00
-  val LARGE_AMOUNT: Double = 500.00
-  val ONE_MINUTE: Long     = 60 * 1000L
-}
-
-@SerialVersionUID(1L)
-class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
-
-  @transient private var flagState: ValueState[java.lang.Boolean] = _
-  @transient private var timerState: ValueState[java.lang.Long] = _
-
-  @throws[Exception]
-  override def open(parameters: Configuration): Unit = {
-    val flagDescriptor = new ValueStateDescriptor("flag", Types.BOOLEAN)
-    flagState = getRuntimeContext.getState(flagDescriptor)
-
-    val timerDescriptor = new ValueStateDescriptor("timer-state", Types.LONG)
-    timerState = getRuntimeContext.getState(timerDescriptor)
-  }
-
-  override def processElement(
-      transaction: Transaction,
-      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
-      collector: Collector[Alert]): Unit = {
-
-    // Get the current state for the current key
-    val lastTransactionWasSmall = flagState.value
-
-    // Check if the flag is set
-    if (lastTransactionWasSmall != null) {
-      if (transaction.getAmount > FraudDetector.LARGE_AMOUNT) {
-        // Output an alert downstream
-        val alert = new Alert
-        alert.setId(transaction.getAccountId)
-
-        collector.collect(alert)
-      }
-      // Clean up our state
-      cleanUp(context)
-    }
-
-    if (transaction.getAmount < FraudDetector.SMALL_AMOUNT) {
-      // set the flag to true
-      flagState.update(true)
-      val timer = context.timerService.currentProcessingTime + FraudDetector.ONE_MINUTE
-
-      context.timerService.registerProcessingTimeTimer(timer)
-      timerState.update(timer)
-    }
-  }
-
-  override def onTimer(
-      timestamp: Long,
-      ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext,
-      out: Collector[Alert]): Unit = {
-    // remove flag after 1 minute
-    timerState.clear()
-    flagState.clear()
-  }
-
-  @throws[Exception]
-  private def cleanUp(ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context): Unit = {
-    // delete timer
-    val timer = timerState.value
-    ctx.timerService.deleteProcessingTimeTimer(timer)
-
-    // clean up all states
-    timerState.clear()
-    flagState.clear()
-  }
-}
-```
-{{< /tab >}}
 {{< /tabs >}}
 
 ### Expected Output
@@ -902,4 +587,4 @@ You should see the following output in your task manager logs:
 2019-08-19 14:22:16,551 INFO  org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
 2019-08-19 14:22:21,723 INFO  org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
 2019-08-19 14:22:26,896 INFO  org.apache.flink.walkthrough.common.sink.AlertSink - Alert{id=3}
-```
\ No newline at end of file
+```
diff --git a/docs/content/docs/try-flink/table_api.md b/docs/content/docs/try-flink/table_api.md
index 45bddc3720c..0dd698262e8 100644
--- a/docs/content/docs/try-flink/table_api.md
+++ b/docs/content/docs/try-flink/table_api.md
@@ -38,7 +38,7 @@ The pipeline will read data from Kafka and write the results to MySQL visualized
 
 ## Prerequisites
 
-This walkthrough assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you come from a different programming language.
+This walkthrough assumes that you have some familiarity with Java, but you should be able to follow along even if you come from a different programming language.
 It also assumes that you are familiar with basic relational concepts such as `SELECT` and `GROUP BY` clauses.
 
 ## Help, I’m Stuck! 
@@ -198,7 +198,7 @@ The goal is to build a report that shows the total spend for each account across
 This means the timestamp column needs be be rounded down from millisecond to hour granularity. 
 
 Flink supports developing relational applications in pure [SQL]({{< ref "docs/dev/table/sql/overview" >}}) or using the [Table API]({{< ref "docs/dev/table/tableApi" >}}).
-The Table API is a fluent DSL inspired by SQL, that can be written in Python, Java, or Scala and supports strong IDE integration.
+The Table API is a fluent DSL inspired by SQL, that can be written in Java or Python and supports strong IDE integration.
 Just like a SQL query, Table programs can select the required fields and group by your keys.
 These features, along with [built-in functions]({{< ref "docs/dev/table/functions/systemFunctions" >}}) like `floor` and `sum`, enable you to write this report.
 
diff --git a/flink-end-to-end-tests/flink-quickstart-test/src/main/scala/org/apache/flink/quickstarts/test/QuickstartExample.scala b/flink-end-to-end-tests/flink-quickstart-test/src/main/scala/org/apache/flink/quickstarts/test/QuickstartExample.scala
deleted file mode 100644
index 00c91a45dea..00000000000
--- a/flink-end-to-end-tests/flink-quickstart-test/src/main/scala/org/apache/flink/quickstarts/test/QuickstartExample.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.quickstarts.test
-
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.quickstarts.test.utils.Utils
-import org.apache.flink.streaming.api.datastream.DataStream
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-
-object QuickstartExample {
-  def main(args: Array[String]) {
-
-    val parameterTool = ParameterTool.fromArgs(args)
-
-    if (parameterTool.getNumberOfParameters < 1) {
-      println("Missing parameters!\nUsage: --numRecords <numRecords>")
-      return
-    }
-
-    val numRecordsToEmit = parameterTool.getInt("numRecords")
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    env.enableCheckpointing(5000)
-
-    val source: DataStream[(String)] = env
-      .fromSequence(0, numRecordsToEmit - 1)
-      .map(v => Utils.prefix(v))
-
-    val data = source.collectAsync()
-
-    env.execute("Quickstart example")
-
-    var count = 0
-    while (data.hasNext) {
-      data.next()
-      count += 1
-    }
-    if (count != numRecordsToEmit) {
-      throw new RuntimeException(
-        s"Unexpected number of records; expected :$numRecordsToEmit actual: $count")
-    }
-  }
-}
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh
index a634ee34b87..fb0b943f70b 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -202,10 +202,8 @@ function run_group_2 {
     run_test "Netty shuffle direct memory consumption end-to-end test" "$END_TO_END_DIR/test-scripts/test_netty_shuffle_memory_control.sh"
 
     run_test "Quickstarts Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh java"
-    run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala"
 
     run_test "Walkthrough DataStream Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_datastream_walkthroughs.sh java"
-    run_test "Walkthrough DataStream Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_datastream_walkthroughs.sh scala"
 
     run_test "Avro Confluent Schema Registry nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_confluent_schema_registry.sh"
 
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
deleted file mode 100644
index 2d4623bc473..00000000000
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/clustering/KMeans.scala
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala.clustering
-
-import org.apache.flink.api.common.functions._
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala._
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.examples.java.clustering.util.KMeansData
-
-import scala.collection.JavaConverters._
-
-/**
- * This example implements a basic K-Means clustering algorithm.
- *
- * K-Means is an iterative clustering algorithm and works as follows: K-Means is given a set of data
- * points to be clustered and an initial set of ''K'' cluster centers. In each iteration, the
- * algorithm computes the distance of each data point to each cluster center. Each point is assigned
- * to the cluster center which is closest to it. Subsequently, each cluster center is moved to the
- * center (''mean'') of all points that have been assigned to it. The moved cluster centers are fed
- * into the next iteration. The algorithm terminates after a fixed number of iterations (as in this
- * implementation) or if cluster centers do not (significantly) move in an iteration. This is the
- * Wikipedia entry for the
- * [[http://en.wikipedia .org/wiki/K-means_clustering K-Means Clustering algorithm]].
- *
- * This implementation works on two-dimensional data points. It computes an assignment of data
- * points to cluster centers, i.e., each data point is annotated with the id of the final cluster
- * (center) it belongs to.
- *
- * Input files are plain text files and must be formatted as follows:
- *
- *   - Data points are represented as two double values separated by a blank character. Data points
- *     are separated by newline characters. For example `"1.2 2.3\n5.3 7.2\n"` gives two data points
- *     (x=1.2, y=2.3) and (x=5.3, y=7.2).
- *   - Cluster centers are represented by an integer id and a point value. For example `"1 6.2
- *     3.2\n2 2.9 5.7\n"` gives two centers (id=1, x=6.2, y=3.2) and (id=2, x=2.9, y=5.7).
- *
- * Usage:
- * {{{
- *   KMeans --points <path> --centroids <path> --output <path> --iterations <n>
- * }}}
- * If no parameters are provided, the program is run with default data from
- * [[org.apache.flink.examples.java.clustering.util.KMeansData]] and 10 iterations.
- *
- * This example shows how to use:
- *
- *   - Bulk iterations
- *   - Broadcast variables in bulk iterations
- *   - Scala case classes
- */
-object KMeans {
-
-  def main(args: Array[String]) {
-
-    // checking input parameters
-    val params: ParameterTool = ParameterTool.fromArgs(args)
-
-    // set up execution environment
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-
-    // get input data:
-    // read the points and centroids from the provided paths or fall back to default data
-    val points: DataSet[Point] = getPointDataSet(params, env)
-    val centroids: DataSet[Centroid] = getCentroidDataSet(params, env)
-
-    val finalCentroids = centroids.iterate(params.getInt("iterations", 10)) {
-      currentCentroids =>
-        val newCentroids = points
-          .map(new SelectNearestCenter)
-          .withBroadcastSet(currentCentroids, "centroids")
-          .map(x => (x._1, x._2, 1L))
-          .withForwardedFields("_1; _2")
-          .groupBy(0)
-          .reduce((p1, p2) => (p1._1, p1._2.add(p2._2), p1._3 + p2._3))
-          .withForwardedFields("_1")
-          .map(x => new Centroid(x._1, x._2.div(x._3)))
-          .withForwardedFields("_1->id")
-        newCentroids
-    }
-
-    val clusteredPoints: DataSet[(Int, Point)] =
-      points.map(new SelectNearestCenter).withBroadcastSet(finalCentroids, "centroids")
-
-    if (params.has("output")) {
-      clusteredPoints.writeAsCsv(params.get("output"), "\n", " ")
-      env.execute("Scala KMeans Example")
-    } else {
-      println("Printing result to stdout. Use --output to specify output path.")
-      clusteredPoints.print()
-    }
-
-  }
-
-  // *************************************************************************
-  //     UTIL FUNCTIONS
-  // *************************************************************************
-
-  def getCentroidDataSet(params: ParameterTool, env: ExecutionEnvironment): DataSet[Centroid] = {
-    if (params.has("centroids")) {
-      env.readCsvFile[Centroid](
-        params.get("centroids"),
-        fieldDelimiter = " ",
-        includedFields = Array(0, 1, 2))
-    } else {
-      println("Executing K-Means example with default centroid data set.")
-      println("Use --centroids to specify file input.")
-      env.fromCollection(KMeansData.CENTROIDS.map {
-        case Array(id, x, y) =>
-          new Centroid(id.asInstanceOf[Int], x.asInstanceOf[Double], y.asInstanceOf[Double])
-      })
-    }
-  }
-
-  def getPointDataSet(params: ParameterTool, env: ExecutionEnvironment): DataSet[Point] = {
-    if (params.has("points")) {
-      env.readCsvFile[Point](
-        params.get("points"),
-        fieldDelimiter = " ",
-        includedFields = Array(0, 1))
-    } else {
-      println("Executing K-Means example with default points data set.")
-      println("Use --points to specify file input.")
-      env.fromCollection(KMeansData.POINTS.map {
-        case Array(x, y) => new Point(x.asInstanceOf[Double], y.asInstanceOf[Double])
-      })
-    }
-  }
-
-  // *************************************************************************
-  //     DATA TYPES
-  // *************************************************************************
-
-  /**
-   * Common trait for operations supported by both points and centroids Note: case class inheritance
-   * is not allowed in Scala
-   */
-  trait Coordinate extends Serializable {
-
-    var x: Double
-    var y: Double
-
-    def add(other: Coordinate): this.type = {
-      x += other.x
-      y += other.y
-      this
-    }
-
-    def div(other: Long): this.type = {
-      x /= other
-      y /= other
-      this
-    }
-
-    def euclideanDistance(other: Coordinate): Double =
-      Math.sqrt((x - other.x) * (x - other.x) + (y - other.y) * (y - other.y))
-
-    def clear(): Unit = {
-      x = 0
-      y = 0
-    }
-
-    override def toString: String =
-      s"$x $y"
-
-  }
-
-  /** A simple two-dimensional point. */
-  case class Point(var x: Double = 0, var y: Double = 0) extends Coordinate
-
-  /** A simple two-dimensional centroid, basically a point with an ID. */
-  case class Centroid(var id: Int = 0, var x: Double = 0, var y: Double = 0) extends Coordinate {
-
-    def this(id: Int, p: Point) {
-      this(id, p.x, p.y)
-    }
-
-    override def toString: String =
-      s"$id ${super.toString}"
-
-  }
-
-  /** Determines the closest cluster center for a data point. */
-  @ForwardedFields(Array("*->_2"))
-  final class SelectNearestCenter extends RichMapFunction[Point, (Int, Point)] {
-    private var centroids: Traversable[Centroid] = null
-
-    /** Reads the centroid values from a broadcast variable into a collection. */
-    override def open(parameters: Configuration) {
-      centroids = getRuntimeContext.getBroadcastVariable[Centroid]("centroids").asScala
-    }
-
-    def map(p: Point): (Int, Point) = {
-      var minDistance: Double = Double.MaxValue
-      var closestCentroidId: Int = -1
-      for (centroid <- centroids) {
-        val distance = p.euclideanDistance(centroid)
-        if (distance < minDistance) {
-          minDistance = distance
-          closestCentroidId = centroid.id
-        }
-      }
-      (closestCentroidId, p)
-    }
-
-  }
-}
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
deleted file mode 100644
index 8f58c7af8d3..00000000000
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala.graph
-
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala._
-import org.apache.flink.examples.java.graph.util.ConnectedComponentsData
-import org.apache.flink.util.Collector
-
-/**
- * An implementation of the connected components algorithm, using a delta iteration.
- *
- * Initially, the algorithm assigns each vertex an unique ID. In each step, a vertex picks the
- * minimum of its own ID and its neighbors' IDs, as its new ID and tells its neighbors about its new
- * ID. After the algorithm has completed, all vertices in the same component will have the same ID.
- *
- * A vertex whose component ID did not change needs not propagate its information in the next step.
- * Because of that, the algorithm is easily expressible via a delta iteration. We here model the
- * solution set as the vertices with their current component ids, and the workset as the changed
- * vertices. Because we see all vertices initially as changed, the initial workset and the initial
- * solution set are identical. Also, the delta to the solution set is consequently also the next
- * workset.
- *
- * Input files are plain text files and must be formatted as follows:
- *
- *   - Vertices represented as IDs and separated by new-line characters. For example,
- *     `"1\n2\n12\n42\n63"` gives five vertices (1), (2), (12), (42), and (63).
- *   - Edges are represented as pairs for vertex IDs which are separated by space characters. Edges
- *     are separated by new-line characters. For example `"1 2\n2 12\n1 12\n42 63"` gives four
- *     (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63).
- *
- * Usage:
- * {{{
- *   ConnectedComponents --vertices <path> --edges <path> --result <path> --iterations <n>
- * }}}
- *
- * If no parameters are provided, the program is run with default data from
- * [[org.apache.flink.examples.java.graph.util.ConnectedComponentsData]] and 10 iterations.
- *
- * This example shows how to use:
- *
- *   - Delta Iterations
- *   - Generic-typed Functions
- */
-object ConnectedComponents {
-
-  def main(args: Array[String]) {
-
-    val params: ParameterTool = ParameterTool.fromArgs(args)
-
-    // set up execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val maxIterations: Int = params.getInt("iterations", 10)
-
-    // make parameters available in the web interface
-    env.getConfig.setGlobalJobParameters(params)
-
-    // read vertex and edge data
-    // assign the initial components (equal to the vertex id)
-    val vertices =
-      getVertexDataSet(env, params).map(id => (id, id)).withForwardedFields("*->_1;*->_2")
-
-    // undirected edges by emitting for each input edge the input
-    // edges itself and an inverted version
-    val edges =
-      getEdgeDataSet(env, params).flatMap(edge => Seq(edge, (edge._2, edge._1)))
-
-    // open a delta iteration
-    val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array("_1")) {
-      (s, ws) =>
-        // apply the step logic: join with the edges
-        val allNeighbors = ws
-          .join(edges)
-          .where(0)
-          .equalTo(0)((vertex, edge) => (edge._2, vertex._2))
-          .withForwardedFieldsFirst("_2->_2")
-          .withForwardedFieldsSecond("_2->_1")
-
-        // select the minimum neighbor
-        val minNeighbors = allNeighbors.groupBy(0).min(1)
-
-        // update if the component of the candidate is smaller
-        val updatedComponents = minNeighbors
-          .join(s)
-          .where(0)
-          .equalTo(0) {
-            (newVertex, oldVertex, out: Collector[(Long, Long)]) =>
-              if (newVertex._2 < oldVertex._2) out.collect(newVertex)
-          }
-          .withForwardedFieldsFirst("*")
-
-        // delta and new workset are identical
-        (updatedComponents, updatedComponents)
-    }
-
-    if (params.has("output")) {
-      verticesWithComponents.writeAsCsv(params.get("output"), "\n", " ")
-      env.execute("Scala Connected Components Example")
-    } else {
-      println("Printing result to stdout. Use --output to specify output path.")
-      verticesWithComponents.print()
-    }
-
-  }
-
-  private def getVertexDataSet(env: ExecutionEnvironment, params: ParameterTool): DataSet[Long] = {
-    if (params.has("vertices")) {
-      env
-        .readCsvFile[Tuple1[Long]](params.get("vertices"), includedFields = Array(0))
-        .map(x => x._1)
-    } else {
-      println("Executing ConnectedComponents example with default vertices data set.")
-      println("Use --vertices to specify file input.")
-      env.fromCollection(ConnectedComponentsData.VERTICES)
-    }
-  }
-
-  private def getEdgeDataSet(
-      env: ExecutionEnvironment,
-      params: ParameterTool): DataSet[(Long, Long)] = {
-    if (params.has("edges")) {
-      env
-        .readCsvFile[(Long, Long)](
-          params.get("edges"),
-          fieldDelimiter = " ",
-          includedFields = Array(0, 1))
-        .map(x => (x._1, x._2))
-    } else {
-      println("Executing ConnectedComponents example with default edges data set.")
-      println("Use --edges to specify file input.")
-      val edgeData = ConnectedComponentsData.EDGES.map {
-        case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long])
-      }
-      env.fromCollection(edgeData)
-    }
-  }
-}
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
deleted file mode 100644
index 2649890a2c4..00000000000
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/DeltaPageRank.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala.graph
-
-import org.apache.flink.api.scala._
-import org.apache.flink.util.Collector
-
-object DeltaPageRank {
-
-  final private val DAMPENING_FACTOR: Double = 0.85
-  final private val NUM_VERTICES = 5
-  final private val INITIAL_RANK = 1.0 / NUM_VERTICES
-  final private val RANDOM_JUMP = (1 - DAMPENING_FACTOR) / NUM_VERTICES
-  final private val THRESHOLD = 0.0001 / NUM_VERTICES
-
-  type Page = (Long, Double)
-  type Adjacency = (Long, Array[Long])
-
-  def main(args: Array[String]) {
-
-    val maxIterations = 100
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val rawLines: DataSet[String] = env.fromElements("1 2 3 4", "2 1", "3 5", "4 2 3", "5 2 4")
-    val adjacency: DataSet[Adjacency] = rawLines
-      .map(
-        str => {
-          val elements = str.split(' ')
-          val id = elements(0).toLong
-          val neighbors = elements.slice(1, elements.length).map(_.toLong)
-          (id, neighbors)
-        })
-
-    val initialRanks: DataSet[Page] = adjacency
-      .flatMap {
-        (adj, out: Collector[Page]) =>
-          {
-            val targets = adj._2
-            val rankPerTarget = INITIAL_RANK * DAMPENING_FACTOR / targets.length
-
-            // dampen fraction to targets
-            for (target <- targets) {
-              out.collect((target, rankPerTarget))
-            }
-
-            // random jump to self
-            out.collect((adj._1, RANDOM_JUMP))
-          }
-      }
-      .groupBy(0)
-      .sum(1)
-
-    val initialDeltas = initialRanks
-      .map((page) => (page._1, page._2 - INITIAL_RANK))
-      .withForwardedFields("_1")
-
-    val iteration = initialRanks.iterateDelta(initialDeltas, maxIterations, Array(0)) {
-
-      (solutionSet, workset) =>
-        {
-          val deltas = workset
-            .join(adjacency)
-            .where(0)
-            .equalTo(0) {
-              (lastDeltas, adj, out: Collector[Page]) =>
-                {
-                  val targets = adj._2
-                  val deltaPerTarget = DAMPENING_FACTOR * lastDeltas._2 / targets.length
-
-                  for (target <- targets) {
-                    out.collect((target, deltaPerTarget))
-                  }
-                }
-            }
-            .groupBy(0)
-            .sum(1)
-            .filter(x => Math.abs(x._2) > THRESHOLD)
-
-          val rankUpdates = solutionSet
-            .join(deltas)
-            .where(0)
-            .equalTo(0)((current, delta) => (current._1, current._2 + delta._2))
-            .withForwardedFieldsFirst("_1")
-
-          (rankUpdates, deltas)
-        }
-    }
-
-    iteration.print()
-
-  }
-}
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTriangles.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTriangles.scala
deleted file mode 100644
index a91ad34087d..00000000000
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/EnumTriangles.scala
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala.graph
-
-import org.apache.flink.api.common.functions.GroupReduceFunction
-import org.apache.flink.api.common.operators.Order
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.examples.java.graph.util.EnumTrianglesData
-import org.apache.flink.util.Collector
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-/**
- * Triangle enumeration is a pre-processing step to find closely connected parts in graphs. A
- * triangle consists of three edges that connect three vertices with each other.
- *
- * The algorithm works as follows: It groups all edges that share a common vertex and builds triads,
- * i.e., triples of vertices that are connected by two edges. Finally, all triads are filtered for
- * which no third edge exists that closes the triangle.
- *
- * Input files are plain text files and must be formatted as follows:
- *
- *   - Edges are represented as pairs for vertex IDs which are separated by space characters. Edges
- *     are separated by new-line characters. For example `"1 2\n2 12\n1 12\n42 63"` gives four
- *     (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63) that include a triangle
- *
- * <pre> (1) / \ (2)-(12) </pre>
- *
- * Usage:
- * {{{
- * EnumTriangleBasic <edge path> <result path>
- * }}}
- * <br> If no parameters are provided, the program is run with default data from
- * [[org.apache.flink.examples.java.graph.util.EnumTrianglesData]]
- *
- * This example shows how to use:
- *
- *   - Custom Java objects which extend Tuple
- *   - Group Sorting
- */
-object EnumTriangles {
-
-  def main(args: Array[String]) {
-
-    val params: ParameterTool = ParameterTool.fromArgs(args)
-
-    // set up execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    // make parameters available in the web interface
-    env.getConfig.setGlobalJobParameters(params)
-
-    // read input data
-    val edges =
-      if (params.has("edges")) {
-        env.readCsvFile[Edge](
-          filePath = params.get("edges"),
-          fieldDelimiter = " ",
-          includedFields = Array(0, 1))
-      } else {
-        println("Executing EnumTriangles example with default edges data set.")
-        println("Use --edges to specify file input.")
-        val edges = EnumTrianglesData.EDGES.map {
-          case Array(v1, v2) => new Edge(v1.asInstanceOf[Int], v2.asInstanceOf[Int])
-        }
-        env.fromCollection(edges)
-      }
-
-    // project edges by vertex id
-    val edgesById = edges.map(e => if (e.v1 < e.v2) e else Edge(e.v2, e.v1))
-
-    val triangles = edgesById
-      // build triads
-      .groupBy("v1")
-      .sortGroup("v2", Order.ASCENDING)
-      .reduceGroup(new TriadBuilder())
-      // filter triads
-      .join(edgesById)
-      .where("v2", "v3")
-      .equalTo("v1", "v2")((t, _) => t)
-      .withForwardedFieldsFirst("*")
-
-    // emit result
-    if (params.has("output")) {
-      triangles.writeAsCsv(params.get("output"), "\n", ",")
-      // execute program
-      env.execute("TriangleEnumeration Example")
-    } else {
-      println("Printing result to stdout. Use --output to specify output path.")
-      triangles.print()
-    }
-
-  }
-
-  // *************************************************************************
-  //     USER DATA TYPES
-  // *************************************************************************
-
-  case class Edge(v1: Int, v2: Int) extends Serializable
-  case class Triad(v1: Int, v2: Int, v3: Int) extends Serializable
-
-  // *************************************************************************
-  //     USER FUNCTIONS
-  // *************************************************************************
-
-  /**
-   * Builds triads (triples of vertices) from pairs of edges that share a vertex. The first vertex
-   * of a triad is the shared vertex, the second and third vertex are ordered by vertexId. Assumes
-   * that input edges share the first vertex and are in ascending order of the second vertex.
-   */
-  @ForwardedFields(Array("v1->v1"))
-  class TriadBuilder extends GroupReduceFunction[Edge, Triad] {
-
-    val vertices = mutable.MutableList[Integer]()
-
-    override def reduce(edges: java.lang.Iterable[Edge], out: Collector[Triad]) = {
-
-      // clear vertex list
-      vertices.clear()
-
-      // build and emit triads
-      for (e <- edges.asScala) {
-
-        // combine vertex with all previously read vertices
-        for (v <- vertices) {
-          out.collect(Triad(e.v1, v, e.v2))
-        }
-        vertices += e.v2
-      }
-    }
-  }
-}
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
deleted file mode 100644
index afc1a44f1a1..00000000000
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala.graph
-
-import org.apache.flink.api.common.functions.GroupReduceFunction
-import org.apache.flink.api.java.aggregation.Aggregations.SUM
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala._
-import org.apache.flink.examples.java.graph.util.PageRankData
-import org.apache.flink.util.Collector
-
-import java.lang.Iterable
-
-import scala.collection.JavaConverters._
-
-/**
- * A basic implementation of the Page Rank algorithm using a bulk iteration.
- *
- * This implementation requires a set of pages and a set of directed links as input and works as
- * follows.
- *
- * In each iteration, the rank of every page is evenly distributed to all pages it points to. Each
- * page collects the partial ranks of all pages that point to it, sums them up, and applies a
- * dampening factor to the sum. The result is the new rank of the page. A new iteration is started
- * with the new ranks of all pages. This implementation terminates after a fixed number of
- * iterations. This is the Wikipedia entry for the
- * [[http://en.wikipedia.org/wiki/Page_rank Page Rank algorithm]]
- *
- * Input files are plain text files and must be formatted as follows:
- *
- *   - Pages represented as an (long) ID separated by new-line characters. For example
- *     `"1\n2\n12\n42\n63"` gives five pages with IDs 1, 2, 12, 42, and 63.
- *   - Links are represented as pairs of page IDs which are separated by space characters. Links are
- *     separated by new-line characters. For example `"1 2\n2 12\n1 12\n42 63"` gives four
- *     (directed) links (1)->(2), (2)->(12), (1)->(12), and (42)->(63). For this simple
- *     implementation it is required that each page has at least one incoming and one outgoing link
- *     (a page can point to itself).
- *
- * Usage:
- * {{{
- *   PageRankBasic --pages <path> --links <path> --output <path> --numPages <n> --iterations <n>
- * }}}
- *
- * If no parameters are provided, the program is run with default data from
- * [[org.apache.flink.examples.java.graph.util.PageRankData]] and 10 iterations.
- *
- * This example shows how to use:
- *
- *   - Bulk Iterations
- *   - Default Join
- *   - Configure user-defined functions using constructor parameters.
- */
-object PageRankBasic {
-
-  final private val DAMPENING_FACTOR: Double = 0.85
-  final private val EPSILON: Double = 0.0001
-
-  def main(args: Array[String]) {
-
-    val params: ParameterTool = ParameterTool.fromArgs(args)
-
-    // set up execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    // make parameters available in the web interface
-    env.getConfig.setGlobalJobParameters(params)
-
-    // read input data
-    val (pages, numPages) = getPagesDataSet(env, params)
-    val links = getLinksDataSet(env, params)
-    val maxIterations = params.getInt("iterations", 10)
-
-    // assign initial ranks to pages
-    val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages)).withForwardedFields("*->pageId")
-
-    // build adjacency list from link input
-    val adjacencyLists = links
-      .groupBy("sourceId")
-      .reduceGroup(new GroupReduceFunction[Link, AdjacencyList] {
-        override def reduce(values: Iterable[Link], out: Collector[AdjacencyList]): Unit = {
-          var outputId = -1L
-          val outputList = values.asScala.map { t => outputId = t.sourceId; t.targetId }
-          out.collect(new AdjacencyList(outputId, outputList.toArray))
-        }
-      })
-
-    // start iteration
-    val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
-      currentRanks =>
-        val newRanks = currentRanks
-          // distribute ranks to target pages
-          .join(adjacencyLists)
-          .where("pageId")
-          .equalTo("sourceId") {
-            (page, adjacent, out: Collector[Page]) =>
-              val targets = adjacent.targetIds
-              val len = targets.length
-              adjacent.targetIds.foreach(t => out.collect(Page(t, page.rank / len)))
-          }
-          // collect ranks and sum them up
-          .groupBy("pageId")
-          .aggregate(SUM, "rank")
-          // apply dampening factor
-          .map {
-            p => Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))
-          }
-          .withForwardedFields("pageId")
-
-        // terminate if no rank update was significant
-        val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
-          (current, next, out: Collector[Int]) =>
-            // check for significant update
-            if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
-        }
-        (newRanks, termination)
-    }
-
-    val result = finalRanks
-
-    // emit result
-    if (params.has("output")) {
-      result.writeAsCsv(params.get("output"), "\n", " ")
-      // execute program
-      env.execute("Basic PageRank Example")
-    } else {
-      println("Printing result to stdout. Use --output to specify output path.")
-      result.print()
-    }
-  }
-
-  // *************************************************************************
-  //     USER TYPES
-  // *************************************************************************
-
-  case class Link(sourceId: Long, targetId: Long)
-
-  case class Page(pageId: Long, rank: Double)
-
-  case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
-
-  // *************************************************************************
-  //     UTIL METHODS
-  // *************************************************************************
-
-  private def getPagesDataSet(
-      env: ExecutionEnvironment,
-      params: ParameterTool): (DataSet[Long], Long) = {
-    if (params.has("pages") && params.has("numPages")) {
-      val pages = env
-        .readCsvFile[Tuple1[Long]](params.get("pages"), fieldDelimiter = " ", lineDelimiter = "\n")
-        .map(x => x._1)
-      (pages, params.getLong("numPages"))
-    } else {
-      println("Executing PageRank example with default pages data set.")
-      println("Use --pages and --numPages to specify file input.")
-      (env.generateSequence(1, 15), PageRankData.getNumberOfPages)
-    }
-  }
-
-  private def getLinksDataSet(env: ExecutionEnvironment, params: ParameterTool): DataSet[Link] = {
-    if (params.has("links")) {
-      env.readCsvFile[Link](params.get("links"), fieldDelimiter = " ", includedFields = Array(0, 1))
-    } else {
-      println("Executing PageRank example with default links data set.")
-      println("Use --links to specify file input.")
-      val edges = PageRankData.EDGES.map {
-        case Array(v1, v2) => Link(v1.asInstanceOf[Long], v2.asInstanceOf[Long])
-      }
-      env.fromCollection(edges)
-    }
-  }
-}
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
deleted file mode 100644
index c20abf15f86..00000000000
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala.graph
-
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala._
-import org.apache.flink.examples.java.graph.util.ConnectedComponentsData
-import org.apache.flink.util.Collector
-
-object TransitiveClosureNaive {
-
-  def main(args: Array[String]): Unit = {
-
-    val params: ParameterTool = ParameterTool.fromArgs(args)
-
-    // set up execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    // make parameters available in the web interface
-    env.getConfig.setGlobalJobParameters(params)
-
-    val edges =
-      if (params.has("edges")) {
-        env
-          .readCsvFile[(Long, Long)](
-            filePath = params.get("edges"),
-            fieldDelimiter = " ",
-            includedFields = Array(0, 1))
-          .map(x => (x._1, x._2))
-      } else {
-        println("Executing TransitiveClosure example with default edges data set.")
-        println("Use --edges to specify file input.")
-        val edgeData = ConnectedComponentsData.EDGES.map {
-          case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long])
-        }
-        env.fromCollection(edgeData)
-      }
-
-    val maxIterations = params.getInt("iterations", 10)
-
-    val paths = edges.iterateWithTermination(maxIterations) {
-      prevPaths: DataSet[(Long, Long)] =>
-        val nextPaths = prevPaths
-          .join(edges)
-          .where(1)
-          .equalTo(0)((left, right) => (left._1, right._2))
-          .withForwardedFieldsFirst("_1")
-          .withForwardedFieldsSecond("_2")
-          .union(prevPaths)
-          .groupBy(0, 1)
-          .reduce((l, r) => l)
-          .withForwardedFields("_1; _2")
-
-        val terminate = prevPaths
-          .coGroup(nextPaths)
-          .where(0)
-          .equalTo(0) {
-            (
-                prev: Iterator[(Long, Long)],
-                next: Iterator[(Long, Long)],
-                out: Collector[(Long, Long)]) =>
-              {
-                val prevPaths = prev.toSet
-                for (n <- next)
-                  if (!prevPaths.contains(n)) out.collect(n)
-              }
-          }
-          .withForwardedFieldsSecond("*")
-        (nextPaths, terminate)
-    }
-
-    if (params.has("output")) {
-      paths.writeAsCsv(params.get("output"), "\n", " ")
-      env.execute("Scala Transitive Closure Example")
-    } else {
-      println("Printing result to stdout. Use --output to specify output path.")
-      paths.print()
-    }
-
-  }
-}
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala
deleted file mode 100644
index ee5396f21bb..00000000000
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/misc/PiEstimation.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala.misc
-
-import org.apache.flink.api.scala._
-
-object PiEstimation {
-
-  def main(args: Array[String]) {
-
-    val numSamples: Long = if (args.length > 0) args(0).toLong else 1000000
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    // count how many of the samples would randomly fall into
-    // the upper right quadrant of the unit circle
-    val count =
-      env
-        .generateSequence(1, numSamples)
-        .map {
-          sample =>
-            val x = Math.random()
-            val y = Math.random()
-            if (x * x + y * y < 1) 1L else 0L
-        }
-        .reduce(_ + _)
-
-    // ratio of samples in upper right quadrant vs total samples gives surface of upper
-    // right quadrant, times 4 gives surface of whole unit circle, i.e. PI
-    val pi = count
-      .map(_ * 4.0 / numSamples)
-
-    println("We estimate Pi to be:")
-
-    pi.print()
-  }
-
-}
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
deleted file mode 100644
index c38e1876b7f..00000000000
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery10.scala
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala.relational
-
-import org.apache.flink.api.java.aggregation.Aggregations
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala._
-
-/**
- * This program implements a modified version of the TPC-H query 10.
- *
- * The original query can be found at
- * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf) (page
- * 45).
- *
- * This program implements the following SQL equivalent:
- *
- * {{{
- * SELECT
- *        c_custkey,
- *        c_name,
- *        c_address,
- *        n_name,
- *        c_acctbal
- *        SUM(l_extendedprice * (1 - l_discount)) AS revenue,
- * FROM
- *        customer,
- *        orders,
- *        lineitem,
- *        nation
- * WHERE
- *        c_custkey = o_custkey
- *        AND l_orderkey = o_orderkey
- *        AND YEAR(o_orderdate) > '1990'
- *        AND l_returnflag = 'R'
- *        AND c_nationkey = n_nationkey
- * GROUP BY
- *        c_custkey,
- *        c_name,
- *        c_acctbal,
- *        n_name,
- *        c_address
- * }}}
- *
- * Compared to the original TPC-H query this version does not print c_phone and c_comment, only
- * filters by years greater than 1990 instead of a period of 3 months, and does not sort the result
- * by revenue..
- *
- * Input files are plain text CSV files using the pipe character ('|') as field separator as
- * generated by the TPC-H data generator which is available at [http://www.tpc.org/tpch/](a
- * href="http://www.tpc.org/tpch/).
- *
- * Usage:
- * {{{
- * TPCHQuery10 --customer <path> --orders <path> --lineitem <path> --nation <path> --output <path>
- * }}}
- *
- * This example shows how to use:
- *   - tuple data types
- *   - build-in aggregation functions
- *   - join with size hints
- */
-object TPCHQuery10 {
-
-  def main(args: Array[String]) {
-
-    val params: ParameterTool = ParameterTool.fromArgs(args)
-    if (
-      !params.has("lineitem") && !params.has("customer") &&
-      !params.has("orders") && !params.has("nation")
-    ) {
-      println("  This program expects data from the TPC-H benchmark as input data.")
-      println("  Due to legal restrictions, we can not ship generated data.")
-      println("  You can find the TPC-H data generator at http://www.tpc.org/tpch/.")
-      println(
-        "  Usage: TPCHQuery10" +
-          "--customer <path> --orders <path> --lineitem <path> --nation <path> --output <path>")
-      return
-    }
-
-    // set up execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    // make parameters available in the web interface
-    env.getConfig.setGlobalJobParameters(params)
-
-    // get customer data set: (custkey, name, address, nationkey, acctbal)
-    val customers = getCustomerDataSet(env, params.get("customer"))
-    // get orders data set: (orderkey, custkey, orderdate)
-    val orders = getOrdersDataSet(env, params.get("orders"))
-    // get lineitem data set: (orderkey, extendedprice, discount, returnflag)
-    val lineitems = getLineitemDataSet(env, params.get("lineitem"))
-    // get nation data set: (nationkey, name)
-    val nations = getNationDataSet(env, params.get("nation"))
-
-    // filter orders by years
-    val orders1990 = orders
-      .filter(o => o._3.substring(0, 4).toInt > 1990)
-      .map(o => (o._1, o._2))
-
-    // filter lineitems by return status
-    val lineitemsReturn = lineitems
-      .filter(l => l._4.equals("R"))
-      .map(l => (l._1, l._2 * (1 - l._3)))
-
-    // compute revenue by customer
-    val revenueByCustomer = orders1990
-      .joinWithHuge(lineitemsReturn)
-      .where(0)
-      .equalTo(0)
-      .apply((o, l) => (o._2, l._2))
-      .groupBy(0)
-      .aggregate(Aggregations.SUM, 1)
-
-    // compute final result by joining customer and nation information with revenue
-    val result = customers
-      .joinWithTiny(nations)
-      .where(3)
-      .equalTo(0)
-      .apply((c, n) => (c._1, c._2, c._3, n._2, c._5))
-      .join(revenueByCustomer)
-      .where(0)
-      .equalTo(0)
-      .apply((c, r) => (c._1, c._2, c._3, c._4, c._5, r._2))
-
-    if (params.has("output")) {
-      // emit result
-      result.writeAsCsv(params.get("output"), "\n", "|")
-      // execute program
-      env.execute("Scala TPCH Query 10 Example")
-    } else {
-      println("Printing result to stdout. Use --output to specify output path.")
-      result.print()
-    }
-
-  }
-
-  // *************************************************************************
-  //     UTIL METHODS
-  // *************************************************************************
-
-  private def getCustomerDataSet(
-      env: ExecutionEnvironment,
-      customerPath: String): DataSet[(Int, String, String, Int, Double)] = {
-    env.readCsvFile[(Int, String, String, Int, Double)](
-      customerPath,
-      fieldDelimiter = "|",
-      includedFields = Array(0, 1, 2, 3, 5))
-  }
-
-  private def getOrdersDataSet(
-      env: ExecutionEnvironment,
-      ordersPath: String): DataSet[(Int, Int, String)] = {
-    env.readCsvFile[(Int, Int, String)](
-      ordersPath,
-      fieldDelimiter = "|",
-      includedFields = Array(0, 1, 4))
-  }
-
-  private def getLineitemDataSet(
-      env: ExecutionEnvironment,
-      lineitemPath: String): DataSet[(Int, Double, Double, String)] = {
-    env.readCsvFile[(Int, Double, Double, String)](
-      lineitemPath,
-      fieldDelimiter = "|",
-      includedFields = Array(0, 5, 6, 8))
-  }
-
-  private def getNationDataSet(
-      env: ExecutionEnvironment,
-      nationPath: String): DataSet[(Int, String)] = {
-    env.readCsvFile[(Int, String)](nationPath, fieldDelimiter = "|", includedFields = Array(0, 1))
-  }
-}
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
deleted file mode 100644
index 883ce048118..00000000000
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/TPCHQuery3.scala
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala.relational
-
-import org.apache.flink.api.java.aggregation.Aggregations
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala._
-
-/**
- * This program implements a modified version of the TPC-H query 3. The example demonstrates how to
- * assign names to fields by extending the Tuple class. The original query can be found at
- * [http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf) (page
- * 29).
- *
- * This program implements the following SQL equivalent:
- *
- * {{{
- * SELECT
- *      l_orderkey,
- *      SUM(l_extendedprice*(1-l_discount)) AS revenue,
- *      o_orderdate,
- *      o_shippriority
- * FROM customer,
- *      orders,
- *      lineitem
- * WHERE
- *      c_mktsegment = '[SEGMENT]'
- *      AND c_custkey = o_custkey
- *      AND l_orderkey = o_orderkey
- *      AND o_orderdate < date '[DATE]'
- *      AND l_shipdate > date '[DATE]'
- * GROUP BY
- *      l_orderkey,
- *      o_orderdate,
- *      o_shippriority;
- * }}}
- *
- * Compared to the original TPC-H query this version does not sort the result by revenue and
- * orderdate.
- *
- * Input files are plain text CSV files using the pipe character ('|') as field separator as
- * generated by the TPC-H data generator which is available at [http://www.tpc.org/tpch/](a
- * href="http://www.tpc.org/tpch/).
- *
- * Usage:
- * {{{
- * TPCHQuery3 --lineitem <path> --customer <path> --orders <path> --output <path>
- * }}}
- *
- * This example shows how to use:
- *   - case classes and case class field addressing
- *   - build-in aggregation functions
- */
-object TPCHQuery3 {
-
-  def main(args: Array[String]) {
-
-    val params: ParameterTool = ParameterTool.fromArgs(args)
-    if (!params.has("lineitem") && !params.has("customer") && !params.has("orders")) {
-      println("  This program expects data from the TPC-H benchmark as input data.")
-      println("  Due to legal restrictions, we can not ship generated data.")
-      println("  You can find the TPC-H data generator at http://www.tpc.org/tpch/.")
-      println(
-        "  Usage: TPCHQuery3 " +
-          "--lineitem <path> --customer <path> --orders <path> [--output <path>]")
-      return
-    }
-
-    // set up execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    // make parameters available in the web interface
-    env.getConfig.setGlobalJobParameters(params)
-
-    // set filter date
-    val dateFormat = new java.text.SimpleDateFormat("yyyy-MM-dd")
-    val date = dateFormat.parse("1995-03-12")
-
-    // read and filter lineitems by shipDate
-    val lineitems =
-      getLineitemDataSet(env, params.get("lineitem")).filter(
-        l => dateFormat.parse(l.shipDate).after(date))
-    // read and filter customers by market segment
-    val customers =
-      getCustomerDataSet(env, params.get("customer")).filter(c => c.mktSegment.equals("AUTOMOBILE"))
-    // read orders
-    val orders = getOrdersDataSet(env, params.get("orders"))
-
-    // filter orders by order date
-    val items = orders
-      .filter(o => dateFormat.parse(o.orderDate).before(date))
-      // filter orders by joining with customers
-      .join(customers)
-      .where("custId")
-      .equalTo("custId")
-      .apply((o, c) => o)
-      // join with lineitems
-      .join(lineitems)
-      .where("orderId")
-      .equalTo("orderId")
-      .apply(
-        (o, l) =>
-          new ShippedItem(o.orderId, l.extdPrice * (1.0 - l.discount), o.orderDate, o.shipPrio))
-
-    // group by order and aggregate revenue
-    val result = items
-      .groupBy("orderId", "orderDate", "shipPrio")
-      .aggregate(Aggregations.SUM, "revenue")
-
-    if (params.has("output")) {
-      // emit result
-      result.writeAsCsv(params.get("output"), "\n", "|")
-      // execute program
-      env.execute("Scala TPCH Query 3 Example")
-    } else {
-      println("Printing result to stdout. Use --output to specify output path.")
-      result.print()
-    }
-
-  }
-
-  // *************************************************************************
-  //     USER DATA TYPES
-  // *************************************************************************
-
-  case class Lineitem(orderId: Long, extdPrice: Double, discount: Double, shipDate: String)
-  case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: Long)
-  case class Customer(custId: Long, mktSegment: String)
-  case class ShippedItem(orderId: Long, revenue: Double, orderDate: String, shipPrio: Long)
-
-  // *************************************************************************
-  //     UTIL METHODS
-  // *************************************************************************
-
-  private def getLineitemDataSet(
-      env: ExecutionEnvironment,
-      lineitemPath: String): DataSet[Lineitem] = {
-    env.readCsvFile[Lineitem](
-      lineitemPath,
-      fieldDelimiter = "|",
-      includedFields = Array(0, 5, 6, 10))
-  }
-
-  private def getCustomerDataSet(
-      env: ExecutionEnvironment,
-      customerPath: String): DataSet[Customer] = {
-    env.readCsvFile[Customer](customerPath, fieldDelimiter = "|", includedFields = Array(0, 6))
-  }
-
-  private def getOrdersDataSet(env: ExecutionEnvironment, ordersPath: String): DataSet[Order] = {
-    env.readCsvFile[Order](ordersPath, fieldDelimiter = "|", includedFields = Array(0, 1, 4, 7))
-  }
-}
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
deleted file mode 100644
index f5e53c25293..00000000000
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/relational/WebLogAnalysis.scala
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala.relational
-
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala._
-import org.apache.flink.examples.java.relational.util.WebLogData
-import org.apache.flink.util.Collector
-
-/**
- * This program processes web logs and relational data. It implements the following relational
- * query:
- *
- * {{{
- * SELECT
- *       r.pageURL,
- *       r.pageRank,
- *       r.avgDuration
- * FROM documents d JOIN rankings r
- *                  ON d.url = r.url
- * WHERE CONTAINS(d.text, [keywords])
- *       AND r.rank > [rank]
- *       AND NOT EXISTS
- *           (
- *              SELECT * FROM Visits v
- *              WHERE v.destUrl = d.url
- *                    AND v.visitDate < [date]
- *           );
- * }}}
- *
- * Input files are plain text CSV files using the pipe character ('|') as field separator. The
- * tables referenced in the query can be generated using the
- * [org.apache.flink.examples.java.relational.util.WebLogDataGenerator]] and have the following
- * schemas
- *
- * {{{
- * CREATE TABLE Documents (
- *                url VARCHAR(100) PRIMARY KEY,
- *                contents TEXT );
- *
- * CREATE TABLE Rankings (
- *                pageRank INT,
- *                pageURL VARCHAR(100) PRIMARY KEY,
- *                avgDuration INT );
- *
- * CREATE TABLE Visits (
- *                sourceIP VARCHAR(16),
- *                destURL VARCHAR(100),
- *                visitDate DATE,
- *                adRevenue FLOAT,
- *                userAgent VARCHAR(64),
- *                countryCode VARCHAR(3),
- *                languageCode VARCHAR(6),
- *                searchWord VARCHAR(32),
- *                duration INT );
- * }}}
- *
- * Usage
- * {{{
- *   WebLogAnalysis --documents <path> --ranks <path> --visits <path> --output <path>
- * }}}
- *
- * If no parameters are provided, the program is run with default data from
- * [[org.apache.flink.examples.java.relational.util.WebLogData]].
- *
- * This example shows how to use:
- *
- *   - tuple data types
- *   - projection and join projection
- *   - the CoGroup transformation for an anti-join
- */
-object WebLogAnalysis {
-
-  def main(args: Array[String]) {
-
-    val params: ParameterTool = ParameterTool.fromArgs(args)
-
-    // set up execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    // make parameters available in the web interface
-    env.getConfig.setGlobalJobParameters(params)
-
-    val documents = getDocumentsDataSet(env, params)
-    val ranks = getRanksDataSet(env, params)
-    val visits = getVisitsDataSet(env, params)
-
-    val filteredDocs = documents
-      .filter(doc => doc._2.contains(" editors ") && doc._2.contains(" oscillations "))
-
-    val filteredRanks = ranks
-      .filter(rank => rank._1 > 40)
-
-    val filteredVisits = visits
-      .filter(visit => visit._2.substring(0, 4).toInt == 2007)
-
-    val joinDocsRanks = filteredDocs
-      .join(filteredRanks)
-      .where(0)
-      .equalTo(1)((doc, rank) => rank)
-      .withForwardedFieldsSecond("*")
-
-    val result = joinDocsRanks
-      .coGroup(filteredVisits)
-      .where(1)
-      .equalTo(0) {
-        (
-            ranks: Iterator[(Int, String, Int)],
-            visits: Iterator[(String, String)],
-            out: Collector[(Int, String, Int)]) =>
-          if (visits.isEmpty) for (rank <- ranks) out.collect(rank)
-      }
-      .withForwardedFieldsFirst("*")
-
-    // emit result
-    if (params.has("output")) {
-      result.writeAsCsv(params.get("output"), "\n", "|")
-      env.execute("Scala WebLogAnalysis Example")
-    } else {
-      println("Printing result to stdout. Use --output to specify output path.")
-      result.print()
-    }
-
-  }
-
-  private def getDocumentsDataSet(
-      env: ExecutionEnvironment,
-      params: ParameterTool): DataSet[(String, String)] = {
-    if (params.has("documents")) {
-      env.readCsvFile[(String, String)](
-        params.get("documents"),
-        fieldDelimiter = "|",
-        includedFields = Array(0, 1))
-    } else {
-      println("Executing WebLogAnalysis example with default documents data set.")
-      println("Use --documents to specify file input.")
-      val documents = WebLogData.DOCUMENTS.map {
-        case Array(x, y) => (x.asInstanceOf[String], y.asInstanceOf[String])
-      }
-      env.fromCollection(documents)
-    }
-  }
-
-  private def getRanksDataSet(
-      env: ExecutionEnvironment,
-      params: ParameterTool): DataSet[(Int, String, Int)] = {
-    if (params.has("ranks")) {
-      env.readCsvFile[(Int, String, Int)](
-        params.get("ranks"),
-        fieldDelimiter = "|",
-        includedFields = Array(0, 1, 2))
-    } else {
-      println("Executing WebLogAnalysis example with default ranks data set.")
-      println("Use --ranks to specify file input.")
-      val ranks = WebLogData.RANKS.map {
-        case Array(x, y, z) => (x.asInstanceOf[Int], y.asInstanceOf[String], z.asInstanceOf[Int])
-      }
-      env.fromCollection(ranks)
-    }
-  }
-
-  private def getVisitsDataSet(
-      env: ExecutionEnvironment,
-      params: ParameterTool): DataSet[(String, String)] = {
-    if (params.has("visits")) {
-      env.readCsvFile[(String, String)](
-        params.get("visits"),
-        fieldDelimiter = "|",
-        includedFields = Array(1, 2))
-    } else {
-      println("Executing WebLogAnalysis example with default visits data set.")
-      println("Use --visits to specify file input.")
-      val visits = WebLogData.VISITS.map {
-        case Array(x, y) => (x.asInstanceOf[String], y.asInstanceOf[String])
-      }
-      env.fromCollection(visits)
-    }
-  }
-}
diff --git a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala b/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
deleted file mode 100644
index 2ad9e304a05..00000000000
--- a/flink-examples/flink-examples-batch/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala.wordcount
-
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala._
-import org.apache.flink.examples.java.wordcount.util.WordCountData
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence histogram over text
- * files.
- *
- * The input is a plain text file with lines separated by newline characters.
- *
- * Usage:
- * {{{
- *   WordCount --input <path> --output <path>
- * }}}
- *
- * If no parameters are provided, the program is run with default data from
- * [[org.apache.flink.examples.java.wordcount.util.WordCountData]]
- *
- * This example shows how to:
- *
- *   - write a simple Flink program.
- *   - use Tuple data types.
- *   - write and use user-defined functions.
- */
-object WordCount {
-
-  def main(args: Array[String]) {
-
-    val params: ParameterTool = ParameterTool.fromArgs(args)
-
-    // set up execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    // make parameters available in the web interface
-    env.getConfig.setGlobalJobParameters(params)
-    val text =
-      if (params.has("input")) {
-        env.readTextFile(params.get("input"))
-      } else {
-        println("Executing WordCount example with default input data set.")
-        println("Use --input to specify file input.")
-        env.fromCollection(WordCountData.WORDS)
-      }
-
-    val counts = text
-      .flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty))
-      .map((_, 1))
-      .groupBy(0)
-      .sum(1)
-
-    if (params.has("output")) {
-      counts.writeAsCsv(params.get("output"), "\n", " ")
-      env.execute("Scala WordCount Example")
-    } else {
-      println("Printing result to stdout. Use --output to specify output path.")
-      counts.print()
-    }
-
-  }
-}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncClient.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncClient.scala
deleted file mode 100644
index 385f13b4ecd..00000000000
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncClient.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.scala.examples.async
-
-import java.util.concurrent.ThreadLocalRandom
-
-import scala.concurrent.{ExecutionContext, Future}
-
-/** A simple asynchronous client that simulates interacting with an unreliable external service. */
-class AsyncClient {
-
-  def query(key: Int)(implicit executor: ExecutionContext): Future[String] = Future {
-    val sleep = (ThreadLocalRandom.current.nextFloat * 100).toLong
-    try Thread.sleep(sleep)
-    catch {
-      case e: InterruptedException =>
-        throw new RuntimeException("AsyncClient was interrupted", e)
-    }
-
-    if (ThreadLocalRandom.current.nextFloat < 0.001f) {
-      throw new RuntimeException("wahahahaha...")
-    } else {
-      "key" + (key % 10)
-    }
-  }
-}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala
deleted file mode 100644
index b55cb60553c..00000000000
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/async/AsyncIOExample.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.scala.examples.async
-
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.scala.async.{ResultFuture, RichAsyncFunction}
-import org.apache.flink.streaming.examples.async.util.SimpleSource
-
-import java.util.concurrent.TimeUnit
-
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.util.{Failure, Success}
-
-object AsyncIOExample {
-
-  /** An example of a [[RichAsyncFunction]] using an async client to query an external service. */
-  class SampleAsyncFunction extends RichAsyncFunction[Int, String] {
-    private var client: AsyncClient = _
-
-    override def open(parameters: Configuration): Unit = {
-      client = new AsyncClient
-    }
-
-    override def asyncInvoke(input: Int, resultFuture: ResultFuture[String]): Unit = {
-      client.query(input).onComplete {
-        case Success(value) => resultFuture.complete(Seq(value))
-        case Failure(exception) => resultFuture.completeExceptionally(exception)
-      }
-    }
-  }
-
-  def main(args: Array[String]): Unit = {
-    val params = ParameterTool.fromArgs(args)
-
-    var mode: String = null
-    var timeout = 0L
-
-    try {
-      mode = params.get("waitMode", "ordered")
-      timeout = params.getLong("timeout", 10000L)
-    } catch {
-      case e: Exception =>
-        println("To customize example, use: AsyncIOExample [--waitMode <ordered or unordered>]")
-        throw e
-    }
-
-    // obtain execution environment
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    // create input stream of a single integer
-    val inputStream = env.addSource(new SimpleSource).map(_.toInt)
-
-    val function = new SampleAsyncFunction
-
-    // add async operator to streaming job
-    val result = mode.toUpperCase match {
-      case "ORDERED" =>
-        AsyncDataStream.orderedWait(inputStream, function, timeout, TimeUnit.MILLISECONDS, 20)
-      case "UNORDERED" =>
-        AsyncDataStream.unorderedWait(inputStream, function, timeout, TimeUnit.MILLISECONDS, 20)
-      case _ => throw new IllegalStateException("Unknown mode: " + mode)
-    }
-
-    result.print()
-
-    // execute the program
-    env.execute("Async IO Example: " + mode)
-  }
-}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
deleted file mode 100644
index e51e977d35c..00000000000
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/iteration/IterateExample.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.scala.examples.iteration
-
-import org.apache.flink.api.common.serialization.SimpleStringEncoder
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala._
-import org.apache.flink.configuration.MemorySize
-import org.apache.flink.connector.file.sink.FileSink
-import org.apache.flink.core.fs.Path
-import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-
-import java.time.Duration
-import java.util.Random
-
-/**
- * Example illustrating iterations in Flink streaming.
- *
- * The program sums up random numbers and counts additions it performs to reach a specific threshold
- * in an iterative streaming fashion.
- *
- * This example shows how to use:
- *
- *   - streaming iterations,
- *   - buffer timeout to enhance latency,
- *   - directed outputs.
- */
-object IterateExample {
-
-  final private val Bound = 100
-
-  def main(args: Array[String]): Unit = {
-    // Checking input parameters
-    val params = ParameterTool.fromArgs(args)
-
-    // obtain execution environment and set setBufferTimeout to 1 to enable
-    // continuous flushing of the output buffers (lowest latency)
-    val env = StreamExecutionEnvironment.getExecutionEnvironment.setBufferTimeout(1)
-
-    // make parameters available in the web interface
-    env.getConfig.setGlobalJobParameters(params)
-
-    // create input stream of integer pairs
-    val inputStream: DataStream[(Int, Int)] =
-      if (params.has("input")) {
-        // map a list of strings to integer pairs
-        env.readTextFile(params.get("input")).map {
-          value: String =>
-            val record = value.substring(1, value.length - 1)
-            val splitted = record.split(",")
-            (Integer.parseInt(splitted(0)), Integer.parseInt(splitted(1)))
-        }
-      } else {
-        println("Executing Iterate example with default input data set.")
-        println("Use --input to specify file input.")
-        env.addSource(new RandomFibonacciSource)
-      }
-
-    def withinBound(value: (Int, Int)) = value._1 < Bound && value._2 < Bound
-
-    // create an iterative data stream from the input with 5 second timeout
-    val numbers: DataStream[((Int, Int), Int)] = inputStream
-      // Map the inputs so that the next Fibonacci numbers can be calculated
-      // while preserving the original input tuple
-      // A counter is attached to the tuple and incremented in every iteration step
-      .map(value => (value._1, value._2, value._1, value._2, 0))
-      .iterate(
-        (iteration: DataStream[(Int, Int, Int, Int, Int)]) => {
-          // calculates the next Fibonacci number and increment the counter
-          val step = iteration.map(
-            value => (value._1, value._2, value._4, value._3 + value._4, value._5 + 1))
-          // testing which tuple needs to be iterated again
-          val feedback = step.filter(value => withinBound(value._3, value._4))
-          // giving back the input pair and the counter
-          val output: DataStream[((Int, Int), Int)] = step
-            .filter(value => !withinBound(value._3, value._4))
-            .map(value => ((value._1, value._2), value._5))
-          (feedback, output)
-        }
-        // timeout after 5 seconds
-        ,
-        5000L
-      )
-
-    if (params.has("output")) {
-      numbers
-        .sinkTo(
-          FileSink
-            .forRowFormat[((Int, Int), Int)](
-              new Path(params.get("output")),
-              new SimpleStringEncoder())
-            .withRollingPolicy(
-              DefaultRollingPolicy
-                .builder()
-                .withMaxPartSize(MemorySize.ofMebiBytes(1))
-                .withRolloverInterval(Duration.ofSeconds(10))
-                .build())
-            .build())
-        .name("file-sink")
-    } else {
-      println("Printing result to stdout. Use --output to specify output path.")
-      numbers.print()
-    }
-
-    env.execute("Streaming Iteration Example")
-  }
-
-  // *************************************************************************
-  // USER FUNCTIONS
-  // *************************************************************************
-
-  /** Generate BOUND number of random integer pairs from the range from 0 to BOUND/2 */
-  private class RandomFibonacciSource extends SourceFunction[(Int, Int)] {
-
-    val rnd = new Random()
-    var counter = 0
-    @volatile var isRunning = true
-
-    override def run(ctx: SourceContext[(Int, Int)]): Unit = {
-
-      while (isRunning && counter < Bound) {
-        val first = rnd.nextInt(Bound / 2 - 1) + 1
-        val second = rnd.nextInt(Bound / 2 - 1) + 1
-
-        ctx.collect((first, second))
-        counter += 1
-        Thread.sleep(50L)
-      }
-    }
-
-    override def cancel(): Unit = isRunning = false
-  }
-
-}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
deleted file mode 100644
index d46f705ac1e..00000000000
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.scala.examples.join
-
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
-import org.apache.flink.streaming.api.windowing.time.Time
-
-/**
- * Example illustrating a windowed stream join between two data streams.
- *
- * The example works on two input streams with pairs (name, grade) and (name, salary) respectively.
- * It joins the streams based on "name" within a configurable window.
- *
- * The example uses a built-in sample data generator that generates the streams of pairs at a
- * configurable rate.
- */
-object WindowJoin {
-
-  // *************************************************************************
-  //  Program Data Types
-  // *************************************************************************
-
-  case class Grade(name: String, grade: Int)
-
-  case class Salary(name: String, salary: Int)
-
-  case class Person(name: String, grade: Int, salary: Int)
-
-  // *************************************************************************
-  //  Program
-  // *************************************************************************
-
-  def main(args: Array[String]): Unit = {
-    // parse the parameters
-    val params = ParameterTool.fromArgs(args)
-    val windowSize = params.getLong("windowSize", 2000)
-    val rate = params.getLong("rate", 3)
-
-    println("Using windowSize=" + windowSize + ", data rate=" + rate)
-    println(
-      "To customize example, use: WindowJoin " +
-        "[--windowSize <window-size-in-millis>] [--rate <elements-per-second>]")
-
-    // obtain execution environment, run this example in "ingestion time"
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    // make parameters available in the web interface
-    env.getConfig.setGlobalJobParameters(params)
-
-    // // create the data sources for both grades and salaries
-    val grades = WindowJoinSampleData.getGradeSource(env, rate)
-    val salaries = WindowJoinSampleData.getSalarySource(env, rate)
-
-    // join the two input streams by name on a window.
-    // for testability, this functionality is in a separate method.
-    val joined = joinStreams(grades, salaries, windowSize)
-
-    // print the results with a single thread, rather than in parallel
-    joined.print().setParallelism(1)
-
-    // execute program
-    env.execute("Windowed Join Example")
-  }
-
-  def joinStreams(
-      grades: DataStream[Grade],
-      salaries: DataStream[Salary],
-      windowSize: Long): DataStream[Person] = {
-
-    grades
-      .join(salaries)
-      .where(_.name)
-      .equalTo(_.name)
-      .window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
-      .apply((g, s) => Person(g.name, g.grade, s.salary))
-  }
-}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoinSampleData.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoinSampleData.scala
deleted file mode 100644
index 5a0e59ecd24..00000000000
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoinSampleData.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.scala.examples.join
-
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.examples.utils.ThrottledIterator
-import org.apache.flink.streaming.scala.examples.join.WindowJoin.{Grade, Salary}
-
-import java.io.Serializable
-import java.util.Random
-
-import scala.collection.JavaConverters._
-
-/** Sample data for the [[WindowJoin]] example. */
-object WindowJoinSampleData {
-
-  private[join] val NAMES = Array("tom", "jerry", "alice", "bob", "john", "grace")
-  private[join] val GRADE_COUNT = 5
-  private[join] val SALARY_MAX = 10000
-
-  /** Continuously generates (name, grade). */
-  def getGradeSource(env: StreamExecutionEnvironment, rate: Long): DataStream[Grade] = {
-    env.fromCollection(new ThrottledIterator(new GradeSource().asJava, rate).asScala)
-  }
-
-  /** Continuously generates (name, salary). */
-  def getSalarySource(env: StreamExecutionEnvironment, rate: Long): DataStream[Salary] = {
-    env.fromCollection(new ThrottledIterator(new SalarySource().asJava, rate).asScala)
-  }
-
-  // --------------------------------------------------------------------------
-
-  class GradeSource extends Iterator[Grade] with Serializable {
-
-    private[this] val rnd = new Random(hashCode())
-
-    def hasNext: Boolean = true
-
-    def next: Grade = {
-      Grade(NAMES(rnd.nextInt(NAMES.length)), rnd.nextInt(GRADE_COUNT) + 1)
-    }
-  }
-
-  class SalarySource extends Iterator[Salary] with Serializable {
-
-    private[this] val rnd = new Random(hashCode())
-
-    def hasNext: Boolean = true
-
-    def next: Salary = {
-      Salary(NAMES(rnd.nextInt(NAMES.length)), rnd.nextInt(SALARY_MAX) + 1)
-    }
-  }
-}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
deleted file mode 100644
index 63b7d2478ca..00000000000
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.scala.examples.socket
-
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
-import org.apache.flink.streaming.api.windowing.time.Time
-
-/**
- * Implements a streaming windowed version of the "WordCount" program.
- *
- * This program connects to a server socket and reads strings from the socket. The easiest way to
- * try this out is to open a text sever (at port 12345) using the ''netcat'' tool via
- * {{{
- * nc -l 12345 on Linux or nc -l -p 12345 on Windows
- * }}}
- * and run this example with the hostname and the port as arguments..
- */
-object SocketWindowWordCount {
-
-  /** Main program method */
-  def main(args: Array[String]): Unit = {
-
-    // the host and the port to connect to
-    var hostname: String = "localhost"
-    var port: Int = 0
-
-    try {
-      val params = ParameterTool.fromArgs(args)
-      hostname = if (params.has("hostname")) params.get("hostname") else "localhost"
-      port = params.getInt("port")
-    } catch {
-      case e: Exception => {
-        System.err.println("No port specified. Please run 'SocketWindowWordCount " +
-          "--hostname <hostname> --port <port>', where hostname (localhost by default) and port " +
-          "is the address of the text server")
-        System.err.println(
-          "To start a simple text server, run 'netcat -l <port>' " +
-            "and type the input text into the command line")
-        return
-      }
-    }
-
-    // get the execution environment
-    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
-
-    // get input data by connecting to the socket
-    val text: DataStream[String] = env.socketTextStream(hostname, port, '\n')
-
-    // parse the data, group it, window it, and aggregate the counts
-    val windowCounts = text
-      .flatMap(w => w.split("\\s"))
-      .map(w => WordWithCount(w, 1))
-      .keyBy(_.word)
-      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
-      .sum("count")
-
-    // print the results with a single thread, rather than in parallel
-    windowCounts.print()
-
-    env.execute("Socket Window WordCount")
-  }
-
-  /** Data type for words with count */
-  case class WordWithCount(word: String, count: Long)
-}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala
deleted file mode 100644
index 49e0f76dd54..00000000000
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/GroupedProcessingTimeWindowExample.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.scala.examples.windowing
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.functions.sink.{DiscardingSink, SinkFunction}
-import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
-import org.apache.flink.streaming.api.windowing.time.Time
-
-/**
- * An example of grouped stream windowing into sliding time windows. This example uses
- * [[RichParallelSourceFunction]] to generate a list of key-value pair.
- */
-object GroupedProcessingTimeWindowExample {
-
-  def main(args: Array[String]): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val stream: DataStream[(Long, Long)] = env.addSource(new DataSource)
-
-    stream
-      .keyBy(_._1)
-      .window(SlidingProcessingTimeWindows.of(Time.milliseconds(2500), Time.milliseconds(500)))
-      .reduce((value1, value2) => (value1._1, value1._2 + value2._2))
-      .addSink(new DiscardingSink[(Long, Long)])
-
-    env.execute()
-  }
-
-  /** Parallel data source that serves a list of key-value pair. */
-  private class DataSource extends RichParallelSourceFunction[(Long, Long)] {
-    @volatile private var running = true
-
-    override def run(ctx: SourceContext[(Long, Long)]): Unit = {
-      val startTime = System.currentTimeMillis()
-
-      val numElements = 20000000
-      val numKeys = 10000
-      var value = 1L
-      var count = 0L
-
-      while (running && count < numElements) {
-
-        ctx.collect((value, 1L))
-
-        count += 1
-        value += 1
-
-        if (value > numKeys) {
-          value = 1L
-        }
-      }
-
-      val endTime = System.currentTimeMillis()
-      println(s"Took ${endTime - startTime} msecs for $numElements values")
-    }
-
-    override def cancel(): Unit = running = false
-  }
-}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
deleted file mode 100644
index c537f316d66..00000000000
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/SessionWindowing.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.scala.examples.windowing
-
-import org.apache.flink.api.common.serialization.SimpleStringEncoder
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.api.scala._
-import org.apache.flink.configuration.MemorySize
-import org.apache.flink.connector.file.sink.FileSink
-import org.apache.flink.core.fs.Path
-import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
-import org.apache.flink.streaming.api.windowing.time.Time
-
-import java.time.Duration
-
-/**
- * An example of grouped stream windowing in session windows with session timeout of 3 msec. A
- * source fetches elements with key, timestamp, and count.
- */
-object SessionWindowing {
-
-  def main(args: Array[String]): Unit = {
-
-    val params = ParameterTool.fromArgs(args)
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    env.getConfig.setGlobalJobParameters(params)
-
-    val fileOutput = params.has("output")
-
-    val input = List(
-      ("a", 1L, 1),
-      ("b", 1L, 1),
-      ("b", 3L, 1),
-      ("b", 5L, 1),
-      ("c", 6L, 1),
-      // We expect to detect the session "a" earlier than this point (the old
-      // functionality can only detect here when the next starts)
-      ("a", 10L, 1),
-      // We expect to detect session "b" and "c" at this point as well
-      ("c", 11L, 1)
-    )
-
-    val source: DataStream[(String, Long, Int)] =
-      env.addSource(new SourceFunction[(String, Long, Int)]() {
-
-        override def run(ctx: SourceContext[(String, Long, Int)]): Unit = {
-          input.foreach(
-            value => {
-              ctx.collectWithTimestamp(value, value._2)
-              ctx.emitWatermark(new Watermark(value._2 - 1))
-            })
-          ctx.emitWatermark(new Watermark(Long.MaxValue))
-        }
-
-        override def cancel(): Unit = {}
-
-      })
-
-    // We create sessions for each id with max timeout of 3 time units
-    val aggregated: DataStream[(String, Long, Int)] = source
-      .keyBy(_._1)
-      .window(EventTimeSessionWindows.withGap(Time.milliseconds(3L)))
-      .sum(2)
-
-    if (fileOutput) {
-      aggregated
-        .sinkTo(
-          FileSink
-            .forRowFormat[(String, Long, Int)](
-              new Path(params.get("output")),
-              new SimpleStringEncoder())
-            .withRollingPolicy(
-              DefaultRollingPolicy
-                .builder()
-                .withMaxPartSize(MemorySize.ofMebiBytes(1))
-                .withRolloverInterval(Duration.ofSeconds(10))
-                .build())
-            .build())
-        .name("file-sink")
-    } else {
-      print("Printing result to stdout. Use --output to specify output path.")
-      aggregated.print()
-    }
-
-    env.execute()
-  }
-
-}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
deleted file mode 100644
index 4a0d84e06cd..00000000000
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/TopSpeedWindowing.scala
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.scala.examples.windowing
-
-import org.apache.flink.api.common.eventtime.WatermarkStrategy
-import org.apache.flink.api.common.serialization.SimpleStringEncoder
-import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.MemorySize
-import org.apache.flink.connector.file.sink.FileSink
-import org.apache.flink.connector.file.src.FileSource
-import org.apache.flink.connector.file.src.reader.TextLineInputFormat
-import org.apache.flink.core.fs.Path
-import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
-import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
-import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor
-import org.apache.flink.streaming.api.windowing.time.Time
-import org.apache.flink.streaming.api.windowing.triggers.DeltaTrigger
-import org.apache.flink.streaming.examples.wordcount.util.WordCountData
-import org.apache.flink.streaming.scala.examples.windowing.util.CarSource
-import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
-
-import java.time.Duration
-import java.util.concurrent.TimeUnit
-
-/**
- * An example of grouped stream windowing where different eviction and trigger policies can be used.
- * A source fetches events from cars every 100 msec containing their id, their current speed (kmh),
- * overall elapsed distance (m) and a timestamp. The streaming example triggers the top speed of
- * each car every x meters elapsed for the last y seconds.
- */
-object TopSpeedWindowing {
-
-  // *************************************************************************
-  // PROGRAM
-  // *************************************************************************
-
-  case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long)
-
-  val numOfCars = 2
-  val evictionSec = 10
-  val triggerMeters = 50d
-
-  def main(args: Array[String]): Unit = {
-    val params = CLI.fromArgs(args)
-
-    // Create the execution environment. This is the main entrypoint
-    // to building a Flink application.
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    // Apache Flink’s unified approach to stream and batch processing means that a DataStream
-    // application executed over bounded input will produce the same final results regardless
-    // of the configured execution mode. It is important to note what final means here: a job
-    // executing in STREAMING mode might produce incremental updates (think upserts in
-    // a database) while in BATCH mode, it would only produce one final result at the end. The
-    // final result will be the same if interpreted correctly, but getting there can be
-    // different.
-    //
-    // The “classic” execution behavior of the DataStream API is called STREAMING execution
-    // mode. Applications should use streaming execution for unbounded jobs that require
-    // continuous incremental processing and are expected to stay online indefinitely.
-    //
-    // By enabling BATCH execution, we allow Flink to apply additional optimizations that we
-    // can only do when we know that our input is bounded. For example, different
-    // join/aggregation strategies can be used, in addition to a different shuffle
-    // implementation that allows more efficient task scheduling and failure recovery behavior.
-    //
-    // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if all sources
-    // are bounded and otherwise STREAMING.
-    env.setRuntimeMode(params.executionMode)
-
-    // This optional step makes the input parameters
-    // available in the Flink UI.
-    env.getConfig.setGlobalJobParameters(params)
-
-    val cars = params.input match {
-      case Some(input) =>
-        // Create a new file source that will read files from a given set of directories.
-        // Each file will be processed as plain text and split based on newlines.
-        val builder = FileSource.forRecordStreamFormat(new TextLineInputFormat, input: _*)
-        params.discoveryInterval.foreach {
-          duration =>
-            // If a discovery interval is provided, the source will
-            // continuously watch the given directories for new files.
-            builder.monitorContinuously(duration)
-        }
-        env
-          .fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input")
-          .map(line => parseMap(line))
-          .name("parse-input")
-      case None =>
-        env.addSource(CarSource(2)).name("in-memory-input")
-    }
-
-    val topSpeeds = cars
-      .assignAscendingTimestamps(_.time)
-      .keyBy(_.carId)
-      .window(GlobalWindows.create)
-      .evictor(TimeEvictor.of(Time.of(evictionSec * 1000, TimeUnit.MILLISECONDS)))
-      .trigger(DeltaTrigger.of(
-        triggerMeters,
-        new DeltaFunction[CarEvent] {
-          def getDelta(oldSp: CarEvent, newSp: CarEvent): Double = newSp.distance - oldSp.distance
-        },
-        cars.dataType.createSerializer(env.getConfig)
-      ))
-//      .window(Time.of(evictionSec * 1000, (car : CarEvent) => car.time))
-//      .every(Delta.of[CarEvent](triggerMeters,
-//          (oldSp,newSp) => newSp.distance-oldSp.distance, CarEvent(0,0,0,0)))
-      .maxBy("speed")
-
-    params.output match {
-      case Some(output) =>
-        // Given an output directory, Flink will write the results to a file
-        // using a simple string encoding. In a production environment, this might
-        // be something more structured like CSV, Avro, JSON, or Parquet.
-        topSpeeds
-          .sinkTo(
-            FileSink
-              .forRowFormat[CarEvent](output, new SimpleStringEncoder())
-              .withRollingPolicy(
-                DefaultRollingPolicy
-                  .builder()
-                  .withMaxPartSize(MemorySize.ofMebiBytes(1))
-                  .withRolloverInterval(Duration.ofSeconds(10))
-                  .build())
-              .build())
-          .name("file-sink")
-
-      case None => topSpeeds.print().name("print-sink")
-    }
-
-    env.execute("TopSpeedWindowing")
-
-  }
-
-  // *************************************************************************
-  // USER FUNCTIONS
-  // *************************************************************************
-
-  def parseMap(line: String): CarEvent = {
-    val record = line.substring(1, line.length - 1).split(",")
-    CarEvent(record(0).toInt, record(1).toInt, record(2).toDouble, record(3).toLong)
-  }
-}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala
deleted file mode 100644
index ab1e1ce4a1e..00000000000
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/WindowWordCount.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.scala.examples.windowing
-
-import org.apache.flink.api.common.eventtime.WatermarkStrategy
-import org.apache.flink.api.common.serialization.SimpleStringEncoder
-import org.apache.flink.api.scala._
-import org.apache.flink.configuration.MemorySize
-import org.apache.flink.connector.file.sink.FileSink
-import org.apache.flink.connector.file.src.FileSource
-import org.apache.flink.connector.file.src.reader.TextLineInputFormat
-import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.examples.wordcount.util.WordCountData
-import org.apache.flink.streaming.scala.examples.wordcount.WordCount.Tokenizer
-import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
-
-import java.time.Duration
-
-/**
- * Implements a windowed version of the streaming "WordCount" program.
- *
- * The input is a plain text file with lines separated by newline characters.
- *
- * Usage:
- * {{{
- * WordCount
- * --input <path>
- * --output <path>
- * --window <n>
- * --slide <n>
- * }}}
- *
- * If no parameters are provided, the program is run with default data from [[WordCountData]].
- *
- * This example shows how to:
- *
- *   - write a simple Flink Streaming program,
- *   - use tuple data types,
- *   - use basic windowing abstractions.
- */
-object WindowWordCount {
-
-  def main(args: Array[String]): Unit = {
-    val params = CLI.fromArgs(args)
-
-    // Create the execution environment. This is the main entrypoint
-    // to building a Flink application.
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    // Apache Flink’s unified approach to stream and batch processing means that a DataStream
-    // application executed over bounded input will produce the same final results regardless
-    // of the configured execution mode. It is important to note what final means here: a job
-    // executing in STREAMING mode might produce incremental updates (think upserts in
-    // a database) while in BATCH mode, it would only produce one final result at the end. The
-    // final result will be the same if interpreted correctly, but getting there can be
-    // different.
-    //
-    // The “classic” execution behavior of the DataStream API is called STREAMING execution
-    // mode. Applications should use streaming execution for unbounded jobs that require
-    // continuous incremental processing and are expected to stay online indefinitely.
-    //
-    // By enabling BATCH execution, we allow Flink to apply additional optimizations that we
-    // can only do when we know that our input is bounded. For example, different
-    // join/aggregation strategies can be used, in addition to a different shuffle
-    // implementation that allows more efficient task scheduling and failure recovery behavior.
-    //
-    // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if all sources
-    // are bounded and otherwise STREAMING.
-    env.setRuntimeMode(params.executionMode)
-
-    // This optional step makes the input parameters
-    // available in the Flink UI.
-    env.getConfig.setGlobalJobParameters(params)
-
-    val text = params.input match {
-      case Some(input) =>
-        // Create a new file source that will read files from a given set of directories.
-        // Each file will be processed as plain text and split based on newlines.
-        val builder = FileSource.forRecordStreamFormat(new TextLineInputFormat, input: _*)
-        params.discoveryInterval.foreach {
-          duration =>
-            // If a discovery interval is provided, the source will
-            // continuously watch the given directories for new files.
-            builder.monitorContinuously(duration)
-        }
-        env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input")
-      case None =>
-        env.fromElements(WordCountData.WORDS: _*).name("in-memory-input")
-    }
-
-    val windowSize = params.getInt("window").getOrElse(250)
-    val slideSize = params.getInt("slide").getOrElse(150)
-
-    val counts =
-      // The text lines read from the source are split into words
-      // using a user-defined function. The tokenizer, implemented below,
-      // will output each words as a (2-tuple) containing (word, 1)
-      text
-        .flatMap(new Tokenizer)
-        .name("tokenizer")
-        // keyBy groups tuples based on the "_1" field, the word.
-        // Using a keyBy allows performing aggregations and other
-        // stateful transformations over data on a per-key basis.
-        // This is similar to a GROUP BY clause in a SQL query.
-        .keyBy(_._1)
-        // create windows of windowSize records slided every slideSize records
-        .countWindow(windowSize, slideSize)
-        // For each key, we perform a simple sum of the "1" field, the count.
-        // If the input data set is bounded, sum will output a final count for
-        // each word. If it is unbounded, it will continuously output updates
-        // each time it sees a new instance of each word in the stream.
-        .sum(1)
-        .name("counter")
-
-    params.output match {
-      case Some(output) =>
-        // Given an output directory, Flink will write the results to a file
-        // using a simple string encoding. In a production environment, this might
-        // be something more structured like CSV, Avro, JSON, or Parquet.
-        counts
-          .sinkTo(
-            FileSink
-              .forRowFormat[(String, Int)](output, new SimpleStringEncoder())
-              .withRollingPolicy(
-                DefaultRollingPolicy
-                  .builder()
-                  .withMaxPartSize(MemorySize.ofMebiBytes(1))
-                  .withRolloverInterval(Duration.ofSeconds(10))
-                  .build())
-              .build())
-          .name("file-sink")
-
-      case None => counts.print().name("print-sink")
-    }
-
-    // Apache Flink applications are composed lazily. Calling execute
-    // submits the Job and begins processing.
-    env.execute("WindowWordCount")
-  }
-}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/util/CarSource.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/util/CarSource.scala
deleted file mode 100644
index d216f249f5d..00000000000
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/util/CarSource.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.scala.examples.windowing.util
-
-import org.apache.flink.api.java.tuple.{Tuple4 => JTuple4}
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.streaming.examples.windowing.util.{CarSource => JCarSource}
-import org.apache.flink.streaming.scala.examples.windowing.TopSpeedWindowing.CarEvent
-
-import java.lang.{Double => JDouble, Integer => JInt, Long => JLong}
-
-/** A simple in-memory source. */
-object CarSource {
-  def apply(cars: Int): CarSource =
-    new CarSource(JCarSource.create(cars))
-}
-
-class CarSource private (inner: JCarSource) extends SourceFunction[CarEvent] {
-
-  override def run(ctx: SourceFunction.SourceContext[CarEvent]): Unit = {
-    inner.run(new WrappingCollector(ctx))
-  }
-
-  override def cancel(): Unit = inner.cancel()
-}
-
-private class WrappingCollector(ctx: SourceFunction.SourceContext[CarEvent])
-  extends SourceFunction.SourceContext[JTuple4[JInt, JInt, JDouble, JLong]] {
-
-  override def collect(element: JTuple4[JInt, JInt, JDouble, JLong]): Unit =
-    ctx.collect(CarEvent(element.f0, element.f1, element.f2, element.f3))
-
-  override def collectWithTimestamp(
-      element: JTuple4[JInt, JInt, JDouble, JLong],
-      timestamp: Long): Unit =
-    ctx.collectWithTimestamp(CarEvent(element.f0, element.f1, element.f2, element.f3), timestamp)
-
-  override def emitWatermark(mark: Watermark): Unit = ctx.emitWatermark(mark)
-
-  override def markAsTemporarilyIdle(): Unit = ctx.markAsTemporarilyIdle()
-
-  override def getCheckpointLock: AnyRef = ctx.getCheckpointLock
-
-  override def close(): Unit = ctx.close()
-}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
deleted file mode 100644
index c19aa3ff2dc..00000000000
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/WordCount.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.scala.examples.wordcount
-
-import org.apache.flink.api.common.eventtime.WatermarkStrategy
-import org.apache.flink.api.common.functions.FlatMapFunction
-import org.apache.flink.api.common.serialization.SimpleStringEncoder
-import org.apache.flink.configuration.MemorySize
-import org.apache.flink.connector.file.sink.FileSink
-import org.apache.flink.connector.file.src.FileSource
-import org.apache.flink.connector.file.src.reader.TextLineInputFormat
-import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.examples.wordcount.util.WordCountData
-import org.apache.flink.streaming.scala.examples.wordcount.util.CLI
-import org.apache.flink.util.Collector
-
-import java.time.Duration
-
-/**
- * Implements the "WordCount" program that computes a simple word occurrence histogram over text
- * files. This Job can be executed in both streaming and batch execution modes.
- *
- * The input is a [list of] plain text file[s] with lines separated by a newline character.
- *
- * Usage:
- *
- * {{{--input <path>}}} A list of input files and / or directories to read. If no input is provided,
- * the program is run with default data from [[WordCountData]].
- *
- * {{{--discovery-interval <duration>}}} Turns the file reader into a continuous source that will
- * monitor the provided input directories every interval and read any new files.
- *
- * {{{--output <path>}}} The output directory where the Job will write the results. If no output
- * path is provided, the Job will print the results to `stdout`
- *
- * {{{--execution-mode <mode>}}} The execution mode (BATCH, STREAMING, or AUTOMATIC) of this
- * pipeline.
- *
- * This example shows how to:
- *
- *   - Write a simple Flink DataStream program
- *   - Use tuple data types
- *   - Write and use a user-defined function
- */
-object WordCount {
-
-  // *************************************************************************
-  // PROGRAM
-  // *************************************************************************
-
-  def main(args: Array[String]): Unit = {
-    val params = CLI.fromArgs(args)
-
-    // Create the execution environment. This is the main entrypoint
-    // to building a Flink application.
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    // Apache Flink’s unified approach to stream and batch processing means that a DataStream
-    // application executed over bounded input will produce the same final results regardless
-    // of the configured execution mode. It is important to note what final means here: a job
-    // executing in STREAMING mode might produce incremental updates (think upserts in
-    // a database) while in BATCH mode, it would only produce one final result at the end. The
-    // final result will be the same if interpreted correctly, but getting there can be
-    // different.
-    //
-    // The “classic” execution behavior of the DataStream API is called STREAMING execution
-    // mode. Applications should use streaming execution for unbounded jobs that require
-    // continuous incremental processing and are expected to stay online indefinitely.
-    //
-    // By enabling BATCH execution, we allow Flink to apply additional optimizations that we
-    // can only do when we know that our input is bounded. For example, different
-    // join/aggregation strategies can be used, in addition to a different shuffle
-    // implementation that allows more efficient task scheduling and failure recovery behavior.
-    //
-    // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if all sources
-    // are bounded and otherwise STREAMING.
-    env.setRuntimeMode(params.executionMode)
-
-    // This optional step makes the input parameters
-    // available in the Flink UI.
-    env.getConfig.setGlobalJobParameters(params)
-
-    // get input data
-    val text = params.input match {
-      case Some(input) =>
-        // Create a new file source that will read files from a given set of directories.
-        // Each file will be processed as plain text and split based on newlines.
-        val builder = FileSource.forRecordStreamFormat(new TextLineInputFormat, input: _*)
-        params.discoveryInterval.foreach {
-          duration =>
-            // If a discovery interval is provided, the source will
-            // continuously watch the given directories for new files.
-            builder.monitorContinuously(duration)
-        }
-        env.fromSource(builder.build(), WatermarkStrategy.noWatermarks(), "file-input")
-      case None =>
-        env.fromElements(WordCountData.WORDS: _*).name("in-memory-input")
-    }
-
-    val counts =
-      // The text lines read from the source are split into words
-      // using a user-defined function. The tokenizer, implemented below,
-      // will output each word as a (2-tuple) containing (word, 1)
-      text
-        .flatMap(new Tokenizer)
-        .name("tokenizer")
-        // keyBy groups tuples based on the "_1" field, the word.
-        // Using a keyBy allows performing aggregations and other
-        // stateful transformations over data on a per-key basis.
-        // This is similar to a GROUP BY clause in a SQL query.
-        .keyBy(_._1)
-        // For each key, we perform a simple sum of the "1" field, the count.
-        // If the input data stream is bounded, sum will output a final count for
-        // each word. If it is unbounded, it will continuously output updates
-        // each time it sees a new instance of each word in the stream.
-        .sum(1)
-        .name("counter")
-
-    params.output match {
-      case Some(output) =>
-        // Given an output directory, Flink will write the results to a file
-        // using a simple string encoding. In a production environment, this might
-        // be something more structured like CSV, Avro, JSON, or Parquet.
-        counts
-          .sinkTo(
-            FileSink
-              .forRowFormat[(String, Int)](output, new SimpleStringEncoder())
-              .withRollingPolicy(
-                DefaultRollingPolicy
-                  .builder()
-                  .withMaxPartSize(MemorySize.ofMebiBytes(1))
-                  .withRolloverInterval(Duration.ofSeconds(10))
-                  .build())
-              .build())
-          .name("file-sink")
-
-      case None => counts.print().name("print-sink")
-    }
-
-    // Apache Flink applications are composed lazily. Calling execute
-    // submits the Job and begins processing.
-    env.execute("WordCount")
-  }
-
-  // *************************************************************************
-  // USER FUNCTIONS
-  // *************************************************************************
-
-  /**
-   * Implements the string tokenizer that splits a sentence into words as a user-defined
-   * FlatMapFunction. The function takes a line (String) and splits it into multiple pairs in the
-   * form of "(word,1)".
-   */
-  final class Tokenizer extends FlatMapFunction[String, (String, Int)] {
-    override def flatMap(value: String, out: Collector[(String, Int)]): Unit = for {
-      token <- value.toLowerCase.split("\\W+")
-      if token.nonEmpty
-    } out.collect((token, 1))
-  }
-}
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/util/CLI.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/util/CLI.scala
deleted file mode 100644
index cff44ba2897..00000000000
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/wordcount/util/CLI.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.scala.examples.wordcount.util
-
-import org.apache.flink.api.common.{ExecutionConfig, RuntimeExecutionMode}
-import org.apache.flink.core.fs.Path
-import org.apache.flink.streaming.examples.wordcount.util.{CLI => JCLI}
-
-import java.time.Duration
-import java.util
-import java.util.Optional
-
-/**
- * A simple CLI parser for the [[org.apache.flink.streaming.scala.examples.wordcount.WordCount]]
- * example application.
- */
-object CLI {
-  def fromArgs(args: Array[String]) = new CLI(JCLI.fromArgs(args))
-}
-
-class CLI private (val inner: JCLI) extends ExecutionConfig.GlobalJobParameters {
-
-  def input: Option[Array[Path]] = asScala(inner.getInputs)
-
-  def discoveryInterval: Option[Duration] = asScala(inner.getDiscoveryInterval)
-
-  def output: Option[Path] = asScala(inner.getOutput)
-
-  def executionMode: RuntimeExecutionMode = inner.getExecutionMode
-
-  def getInt(key: String): Option[Int] = {
-    val result = inner.getInt(key)
-    if (result.isPresent) {
-      Option(result.getAsInt)
-    } else {
-      None
-    }
-  }
-
-  override def equals(obj: Any): Boolean =
-    obj.isInstanceOf[CLI] && inner.equals(obj.asInstanceOf[CLI].inner)
-
-  override def hashCode(): Int = inner.hashCode()
-
-  override def toMap: util.Map[String, String] = inner.toMap
-
-  private def asScala[T](optional: Optional[T]): Option[T] =
-    Option(optional.orElse(null.asInstanceOf[T]))
-}
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java
deleted file mode 100644
index b758618a4c9..00000000000
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/scala/examples/windowing/TopSpeedWindowingExampleITCase.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.test.scala.examples.windowing;
-
-import org.apache.flink.streaming.examples.windowing.util.TopSpeedWindowingExampleData;
-import org.apache.flink.streaming.scala.examples.windowing.TopSpeedWindowing;
-import org.apache.flink.test.util.AbstractTestBase;
-
-import org.junit.Test;
-
-import static org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory;
-
-/** Tests for {@link TopSpeedWindowing}. */
-public class TopSpeedWindowingExampleITCase extends AbstractTestBase {
-
-    @Test
-    public void testProgram() throws Exception {
-        String textPath = createTempFile("text.txt", TopSpeedWindowingExampleData.CAR_DATA);
-        String resultPath = getTempDirPath("result");
-
-        TopSpeedWindowing.main(
-                new String[] {
-                    "--input", textPath,
-                    "--output", resultPath,
-                    "--execution-mode", "AUTOMATIC"
-                });
-
-        compareResultsByLinesInMemory(
-                TopSpeedWindowingExampleData.TOP_CASE_CLASS_SPEEDS, resultPath);
-    }
-}
diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
index 7f27378f201..c79270fdc2e 100644
--- a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
+++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/socket/SocketWindowWordCountITCase.java
@@ -80,48 +80,6 @@ public class SocketWindowWordCountITCase extends AbstractTestBase {
         }
     }
 
-    @Test
-    public void testScalaProgram() throws Exception {
-        InetAddress localhost = InetAddress.getByName("localhost");
-
-        // suppress sysout messages from this example
-        final PrintStream originalSysout = System.out;
-        final PrintStream originalSyserr = System.err;
-
-        final ByteArrayOutputStream errorMessages = new ByteArrayOutputStream();
-
-        System.setOut(new PrintStream(new NullStream()));
-        System.setErr(new PrintStream(errorMessages));
-
-        try {
-            try (ServerSocket server = new ServerSocket(0, 10, localhost)) {
-
-                final ServerThread serverThread = new ServerThread(server);
-                serverThread.setDaemon(true);
-                serverThread.start();
-
-                final int serverPort = server.getLocalPort();
-
-                org.apache.flink.streaming.scala.examples.socket.SocketWindowWordCount.main(
-                        new String[] {"--port", String.valueOf(serverPort)});
-
-                if (errorMessages.size() != 0) {
-                    fail(
-                            "Found error message: "
-                                    + new String(
-                                            errorMessages.toByteArray(),
-                                            ConfigConstants.DEFAULT_CHARSET));
-                }
-
-                serverThread.join();
-                serverThread.checkError();
-            }
-        } finally {
-            System.setOut(originalSysout);
-            System.setErr(originalSyserr);
-        }
-    }
-
     // ------------------------------------------------------------------------
 
     private static class ServerThread extends Thread {
diff --git a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala b/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
deleted file mode 100644
index 16fce22fb36..00000000000
--- a/flink-examples/flink-examples-streaming/src/test/scala/org/apache/flink/streaming/scala/examples/StreamingExamplesITCase.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.scala.examples
-
-import org.apache.flink.core.fs.FileSystem.WriteMode
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.examples.iteration.util.IterateExampleData
-import org.apache.flink.streaming.scala.examples.iteration.IterateExample
-import org.apache.flink.streaming.scala.examples.join.WindowJoin
-import org.apache.flink.streaming.scala.examples.join.WindowJoin.{Grade, Salary}
-import org.apache.flink.streaming.scala.examples.windowing.{SessionWindowing, WindowWordCount}
-import org.apache.flink.streaming.scala.examples.wordcount.WordCount
-import org.apache.flink.streaming.test.examples.join.WindowJoinData
-import org.apache.flink.test.testdata.WordCountData
-import org.apache.flink.test.util.{AbstractTestBase, TestBaseUtils}
-
-import org.apache.commons.io.FileUtils
-import org.junit.Test
-
-import java.io.File
-
-/** Integration test for streaming programs in Scala examples. */
-class StreamingExamplesITCase extends AbstractTestBase {
-
-  @Test
-  def testIterateExample(): Unit = {
-    val inputPath = createTempFile("fibonacciInput.txt", IterateExampleData.INPUT_PAIRS)
-    val resultPath = getTempDirPath("result")
-
-    // the example is inherently non-deterministic. The iteration timeout of 5000 ms
-    // is frequently not enough to make the test run stable on CI infrastructure
-    // with very small containers, so we cannot do a validation here
-    IterateExample.main(
-      Array(
-        "--input",
-        inputPath,
-        "--output",
-        resultPath
-      ))
-  }
-
-  @Test
-  def testWindowJoin(): Unit = {
-    val resultPath = File.createTempFile("result-path", "dir").toURI.toString
-    try {
-      val env = StreamExecutionEnvironment.getExecutionEnvironment
-      env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
-
-      val grades = env
-        .fromCollection(WindowJoinData.GRADES_INPUT.split("\n"))
-        .map(
-          line => {
-            val fields = line.split(",")
-            Grade(fields(1), fields(2).toInt)
-          })
-
-      val salaries = env
-        .fromCollection(WindowJoinData.SALARIES_INPUT.split("\n"))
-        .map(
-          line => {
-            val fields = line.split(",")
-            Salary(fields(1), fields(2).toInt)
-          })
-
-      WindowJoin
-        .joinStreams(grades, salaries, 100)
-        .writeAsText(resultPath, WriteMode.OVERWRITE)
-
-      env.execute()
-
-      TestBaseUtils.checkLinesAgainstRegexp(resultPath, "^Person\\([a-z]+,(\\d),(\\d)+\\)")
-    } finally
-      try
-        FileUtils.deleteDirectory(new File(resultPath))
-
-      catch {
-        case _: Throwable =>
-      }
-  }
-
-  @Test
-  def testSessionWindowing(): Unit = {
-    val resultPath = getTempDirPath("result")
-    SessionWindowing.main(Array("--output", resultPath))
-  }
-
-  @Test
-  def testWindowWordCount(): Unit = {
-    val windowSize = "25"
-    val slideSize = "15"
-    val textPath = createTempFile("text.txt", WordCountData.TEXT)
-    val resultPath = getTempDirPath("result")
-
-    WindowWordCount.main(
-      Array(
-        "--input",
-        textPath,
-        "--output",
-        resultPath,
-        "--window",
-        windowSize,
-        "--slide",
-        slideSize,
-        "--execution-mode",
-        "AUTOMATIC"
-      ))
-
-    // since the parallel tokenizers might have different speed
-    // the exact output can not be checked just whether it is well-formed
-    // checks that the result lines look like e.g. (faust, 2)
-    TestBaseUtils.checkLinesAgainstRegexp(resultPath, "^\\([a-z]+,(\\d)+\\)")
-  }
-
-  @Test
-  def testWordCount(): Unit = {
-    val textPath = createTempFile("text.txt", WordCountData.TEXT)
-    val resultPath = getTempDirPath("result")
-
-    WordCount.main(
-      Array(
-        "--input",
-        textPath,
-        "--output",
-        resultPath,
-        "--execution-mode",
-        "automatic"
-      ))
-
-    TestBaseUtils.compareResultsByLinesInMemory(WordCountData.COUNTS_AS_TUPLES, resultPath)
-  }
-}
diff --git a/flink-examples/flink-examples-table/pom.xml b/flink-examples/flink-examples-table/pom.xml
index be039625a07..6870c921c0e 100644
--- a/flink-examples/flink-examples-table/pom.xml
+++ b/flink-examples/flink-examples-table/pom.xml
@@ -46,11 +46,6 @@ under the License.
 			<artifactId>flink-table-api-java-bridge</artifactId>
 			<version>${project.version}</version>
 		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-		</dependency>
 		<!-- The following two dependencies are not required to define a SQL job pipeline,
 		but only to execute it.
 
diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/basics/GettingStartedExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/basics/GettingStartedExample.scala
deleted file mode 100644
index 2b18b27be3a..00000000000
--- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/basics/GettingStartedExample.scala
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.examples.scala.basics
-
-import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment, _}
-import org.apache.flink.table.functions.ScalarFunction
-import org.apache.flink.types.Row
-import org.apache.flink.util.CloseableIterator
-
-import java.time.LocalDate
-
-import scala.collection.JavaConverters._
-
-/**
- * Example for getting started with the Table & SQL API in Scala.
- *
- * The example shows how to create, transform, and query a table. It should give a first impression
- * about the look-and-feel of the API without going too much into details. See the other examples
- * for using connectors or more complex operations.
- *
- * In particular, the example shows how to
- *   - setup a [[TableEnvironment]],
- *   - use the environment for creating example tables, registering views, and executing SQL
- *     queries,
- *   - transform tables with filters and projections,
- *   - declare user-defined functions,
- *   - and print/collect results locally.
- *
- * The example executes two Flink jobs. The results are written to stdout.
- */
-object GettingStartedExample {
-
-  def main(args: Array[String]): Unit = {
-
-    // setup the unified API
-    // in this case: declare that the table programs should be executed in batch mode
-    val settings = EnvironmentSettings
-      .newInstance()
-      .inBatchMode()
-      .build()
-    val env = TableEnvironment.create(settings)
-
-    // create a table with example data without a connector required
-    val rawCustomers = env.fromValues(
-      row(
-        "Guillermo Smith",
-        LocalDate.parse("1992-12-12"),
-        "4081 Valley Road",
-        "08540",
-        "New Jersey",
-        "m",
-        true,
-        0,
-        78,
-        3),
-      row(
-        "Valeria Mendoza",
-        LocalDate.parse("1970-03-28"),
-        "1239  Rainbow Road",
-        "90017",
-        "Los Angeles",
-        "f",
-        true,
-        9,
-        39,
-        0),
-      row(
-        "Leann Holloway",
-        LocalDate.parse("1989-05-21"),
-        "2359 New Street",
-        "97401",
-        "Eugene",
-        null,
-        true,
-        null,
-        null,
-        null),
-      row(
-        "Brandy Sanders",
-        LocalDate.parse("1956-05-26"),
-        "4891 Walkers-Ridge-Way",
-        "73119",
-        "Oklahoma City",
-        "m",
-        false,
-        9,
-        39,
-        0),
-      row(
-        "John Turner",
-        LocalDate.parse("1982-10-02"),
-        "2359 New Street",
-        "60605",
-        "Chicago",
-        "m",
-        true,
-        12,
-        39,
-        0),
-      row(
-        "Ellen Ortega",
-        LocalDate.parse("1985-06-18"),
-        "2448 Rodney STreet",
-        "85023",
-        "Phoenix",
-        "f",
-        true,
-        0,
-        78,
-        3)
-    )
-
-    // handle ranges of columns easily
-    val truncatedCustomers = rawCustomers.select(withColumns(1 to 7))
-
-    // name columns
-    val namedCustomers = truncatedCustomers
-      .as("name", "date_of_birth", "street", "zip_code", "city", "gender", "has_newsletter")
-
-    // register a view temporarily
-    env.createTemporaryView("customers", namedCustomers)
-
-    // use SQL whenever you like
-    // call execute() and print() to get insights
-    env
-      .sqlQuery("""
-                  |SELECT
-                  |  COUNT(*) AS `number of customers`,
-                  |  AVG(YEAR(date_of_birth)) AS `average birth year`
-                  |FROM `customers`
-                  |""".stripMargin)
-      .execute()
-      .print()
-
-    // or further transform the data using the fluent Table API
-    // e.g. filter, project fields, or call a user-defined function
-    val youngCustomers = env
-      .from("customers")
-      .filter($"gender".isNotNull)
-      .filter($"has_newsletter" === true)
-      .filter($"date_of_birth" >= LocalDate.parse("1980-01-01"))
-      .select(
-        $"name".upperCase(),
-        $"date_of_birth",
-        call(classOf[AddressNormalizer], $"street", $"zip_code", $"city").as("address")
-      )
-
-    // use execute() and collect() to retrieve your results from the cluster
-    // this can be useful for testing before storing it in an external system
-    var iterator: CloseableIterator[Row] = null
-    try {
-      iterator = youngCustomers.execute().collect()
-      val actualOutput = iterator.asScala.toSet
-
-      val expectedOutput = Set(
-        Row.of(
-          "GUILLERMO SMITH",
-          LocalDate.parse("1992-12-12"),
-          "4081 VALLEY ROAD, 08540, NEW JERSEY"),
-        Row.of("JOHN TURNER", LocalDate.parse("1982-10-02"), "2359 NEW STREET, 60605, CHICAGO"),
-        Row.of("ELLEN ORTEGA", LocalDate.parse("1985-06-18"), "2448 RODNEY STREET, 85023, PHOENIX")
-      )
-
-      if (actualOutput == expectedOutput) {
-        println("SUCCESS!")
-      } else {
-        println("FAILURE!")
-      }
-    } finally {
-      if (iterator != null) {
-        iterator.close()
-      }
-    }
-  }
-
-  /**
-   * We can put frequently used procedures in user-defined functions.
-   *
-   * It is possible to call third-party libraries here as well.
-   */
-  class AddressNormalizer extends ScalarFunction {
-
-    // the 'eval()' method defines input and output types (reflectively extracted)
-    // and contains the runtime logic
-    def eval(street: String, zipCode: String, city: String): String = {
-      normalize(street) + ", " + normalize(zipCode) + ", " + normalize(city)
-    }
-
-    private def normalize(s: String) = {
-      s.toUpperCase.replaceAll("\\W", " ").replaceAll("\\s+", " ").trim
-    }
-  }
-}
diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/basics/StreamSQLExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/basics/StreamSQLExample.scala
deleted file mode 100644
index 6bfb061b0f3..00000000000
--- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/basics/StreamSQLExample.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.examples.scala.basics
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-import org.apache.flink.table.api.bridge.scala._
-
-/**
- * Simple example for demonstrating the use of SQL on a table backed by a [[DataStream]] in Scala
- * DataStream API.
- *
- * In particular, the example shows how to
- *   - convert two bounded data streams to tables,
- *   - register a table as a view under a name,
- *   - run a stream SQL query on registered and unregistered tables,
- *   - and convert the insert-only table back to a data stream.
- *
- * The example executes a single Flink job. The results are written to stdout.
- */
-object StreamSQLExample {
-
-  // *************************************************************************
-  //     PROGRAM
-  // *************************************************************************
-
-  def main(args: Array[String]): Unit = {
-
-    // set up the Scala DataStream API
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    // set up the Scala Table API
-    val tableEnv = StreamTableEnvironment.create(env)
-
-    val orderA =
-      env.fromCollection(Seq(Order(1L, "beer", 3), Order(1L, "diaper", 4), Order(3L, "rubber", 2)))
-
-    val orderB =
-      env.fromCollection(Seq(Order(2L, "pen", 3), Order(2L, "rubber", 3), Order(4L, "beer", 1)))
-
-    // convert the first DataStream to a Table object
-    // it will be used "inline" and is not registered in a catalog
-    val tableA = tableEnv.fromDataStream(orderA)
-
-    // convert the second DataStream and register it as a view
-    // it will be accessible under a name
-    tableEnv.createTemporaryView("TableB", orderB)
-
-    // union the two tables
-    val result = tableEnv.sqlQuery(s"""
-                                      |SELECT * FROM $tableA WHERE amount > 2
-                                      |UNION ALL
-                                      |SELECT * FROM TableB WHERE amount < 2
-        """.stripMargin)
-
-    // convert the Table back to an insert-only DataStream of type `Order`
-    tableEnv.toDataStream(result, classOf[Order]).print()
-
-    // after the table program is converted to a DataStream program,
-    // we must use `env.execute()` to submit the job
-    env.execute()
-  }
-
-  // *************************************************************************
-  //     USER DATA TYPES
-  // *************************************************************************
-
-  /** Simple case class. */
-  case class Order(user: Long, product: String, amount: Int)
-}
diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/basics/StreamTableExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/basics/StreamTableExample.scala
deleted file mode 100644
index 1e71f8e52af..00000000000
--- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/basics/StreamTableExample.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.examples.scala.basics
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
-import org.apache.flink.table.api._
-import org.apache.flink.table.api.bridge.scala._
-
-/**
- * Simple example for demonstrating the use of Table API on a Stream Table.
- *
- * This example shows how to:
- *   - Convert DataStreams to Tables
- *   - Apply union, select, and filter operations
- */
-object StreamTableExample {
-
-  // *************************************************************************
-  //     PROGRAM
-  // *************************************************************************
-
-  def main(args: Array[String]): Unit = {
-
-    // set up execution environment
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = StreamTableEnvironment.create(env)
-
-    val orderA = env
-      .fromCollection(Seq(Order(1L, "beer", 3), Order(1L, "diaper", 4), Order(3L, "rubber", 2)))
-      .toTable(tEnv)
-
-    val orderB = env
-      .fromCollection(Seq(Order(2L, "pen", 3), Order(2L, "rubber", 3), Order(4L, "beer", 1)))
-      .toTable(tEnv)
-
-    // union the two tables
-    val result: DataStream[Order] = orderA
-      .unionAll(orderB)
-      .select('user, 'product, 'amount)
-      .where('amount > 2)
-      .toDataStream(classOf[Order])
-
-    result.print()
-
-    env.execute()
-  }
-
-  // *************************************************************************
-  //     USER DATA TYPES
-  // *************************************************************************
-
-  case class Order(user: Long, product: String, amount: Int)
-
-}
diff --git a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/basics/WordCountSQLExample.scala b/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/basics/WordCountSQLExample.scala
deleted file mode 100644
index 42a772c66c9..00000000000
--- a/flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/basics/WordCountSQLExample.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.examples.scala.basics
-
-import org.apache.flink.table.api._
-
-/** The famous word count example that shows a minimal Flink SQL job in batch execution mode. */
-object WordCountSQLExample {
-
-  def main(args: Array[String]): Unit = {
-
-    // set up the Table API
-    val settings = EnvironmentSettings
-      .newInstance()
-      .inBatchMode()
-      .build()
-    val tableEnv = TableEnvironment.create(settings)
-
-    // execute a Flink SQL job and print the result locally
-    tableEnv
-      .executeSql(
-        // define the aggregation
-        "SELECT word, SUM(frequency) AS `count`\n"
-        // read from an artificial fixed-size table with rows and columns
-          + "FROM (\n"
-          + "  VALUES ('Hello', 1), ('Ciao', 1), ('Hello', 2)\n"
-          + ")\n"
-          // name the table and its columns
-          + "AS WordTable(word, frequency)\n"
-          // group for aggregation
-          + "GROUP BY word")
-      .print()
-  }
-}
diff --git a/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/scala/basics/GettingStartedExampleITCase.java b/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/scala/basics/GettingStartedExampleITCase.java
deleted file mode 100644
index 2f91291b278..00000000000
--- a/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/scala/basics/GettingStartedExampleITCase.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.examples.scala.basics;
-
-import org.apache.flink.table.examples.utils.ExampleOutputTestBase;
-
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for Scala {@link GettingStartedExample}. */
-class GettingStartedExampleITCase extends ExampleOutputTestBase {
-
-    @Test
-    void testExample() {
-        GettingStartedExample.main(new String[0]);
-        final String consoleOutput = getOutputString();
-        assertThat(consoleOutput)
-                .contains("|                    6 |                 1979 |")
-                .contains("SUCCESS!");
-    }
-}
diff --git a/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/scala/basics/StreamSQLExampleITCase.java b/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/scala/basics/StreamSQLExampleITCase.java
deleted file mode 100644
index 6839515fc26..00000000000
--- a/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/scala/basics/StreamSQLExampleITCase.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.examples.scala.basics;
-
-import org.apache.flink.table.examples.utils.ExampleOutputTestBase;
-
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for Scala {@link StreamSQLExample}. */
-class StreamSQLExampleITCase extends ExampleOutputTestBase {
-
-    @Test
-    void testExample() {
-        StreamSQLExample.main(new String[0]);
-        final String consoleOutput = getOutputString();
-        assertThat(consoleOutput)
-                .contains("Order(4,beer,1)")
-                .contains("Order(1,beer,3)")
-                .contains("Order(1,diaper,4)");
-    }
-}
diff --git a/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/scala/basics/WordCountSQLExampleITCase.java b/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/scala/basics/WordCountSQLExampleITCase.java
deleted file mode 100644
index 943a4e96967..00000000000
--- a/flink-examples/flink-examples-table/src/test/java/org/apache/flink/table/examples/scala/basics/WordCountSQLExampleITCase.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.examples.scala.basics;
-
-import org.apache.flink.table.examples.utils.ExampleOutputTestBase;
-
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for Scala {@link WordCountSQLExample}. */
-class WordCountSQLExampleITCase extends ExampleOutputTestBase {
-
-    @Test
-    void testExample() {
-        WordCountSQLExample.main(new String[0]);
-        final String consoleOutput = getOutputString();
-        assertThat(consoleOutput)
-                .contains("|                           Ciao |           1 |")
-                .contains("|                          Hello |           3 |");
-    }
-}
diff --git a/flink-quickstart/flink-quickstart-scala/pom.xml b/flink-quickstart/flink-quickstart-scala/pom.xml
deleted file mode 100644
index a04b4978e30..00000000000
--- a/flink-quickstart/flink-quickstart-scala/pom.xml
+++ /dev/null
@@ -1,54 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-  
-	<properties>
-		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-	</properties>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-quickstart</artifactId>
-		<version>1.17-SNAPSHOT</version>
-	</parent>
-
-	<artifactId>flink-quickstart-scala</artifactId>
-	<packaging>maven-archetype</packaging>
-	<name>Flink : Quickstart : Scala</name>
-
-	<build>
-		<plugins>
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-archetype-plugin</artifactId>
-				<executions>
-					<execution>
-						<goals>
-							<goal>jar</goal>
-						</goals>
-						<phase>compile</phase>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-
-</project>
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/META-INF/maven/archetype-metadata.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
deleted file mode 100644
index afb1e44fbd3..00000000000
--- a/flink-quickstart/flink-quickstart-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
+++ /dev/null
@@ -1,36 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<archetype-descriptor
-		xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0"
-		xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd"
-		name="flink-quickstart-java">
-	<fileSets>
-		<fileSet filtered="true" packaged="true" encoding="UTF-8">
-			<directory>src/main/scala</directory>
-			<includes>
-				<include>**/*.scala</include>
-			</includes>
-		</fileSet>
-		<fileSet encoding="UTF-8">
-			<directory>src/main/resources</directory>
-		</fileSet>
-	</fileSets>
-</archetype-descriptor>
\ No newline at end of file
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
deleted file mode 100644
index 07a144f8d96..00000000000
--- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
+++ /dev/null
@@ -1,254 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-
-	<groupId>${groupId}</groupId>
-	<artifactId>${artifactId}</artifactId>
-	<version>${version}</version>
-	<packaging>jar</packaging>
-
-	<name>Flink Quickstart Job</name>
-
-	<repositories>
-		<repository>
-			<id>apache.snapshots</id>
-			<name>Apache Development Snapshot Repository</name>
-			<url>https://repository.apache.org/content/repositories/snapshots/</url>
-			<releases>
-				<enabled>false</enabled>
-			</releases>
-			<snapshots>
-				<enabled>true</enabled>
-			</snapshots>
-		</repository>
-	</repositories>
-
-	<properties>
-		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-		<flink.version>@project.version@</flink.version>
-		<target.java.version>@target.java.version@</target.java.version>
-		<scala.binary.version>2.12</scala.binary.version>
-		<scala.version>2.12.7</scala.version>
-		<log4j.version>@log4j.version@</log4j.version>
-	</properties>
-
-	<dependencies>
-		<!-- Apache Flink dependencies -->
-		<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
-			<version>${flink.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients</artifactId>
-			<version>${flink.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<!-- Scala Library, provided by Flink as well. -->
-		<dependency>
-			<groupId>org.scala-lang</groupId>
-			<artifactId>scala-library</artifactId>
-			<version>${scala.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<!-- Add connector dependencies here. They must be in the default scope (compile). -->
-
-		<!-- Example:
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-kafka</artifactId>
-			<version>${flink.version}</version>
-		</dependency>
-		-->
-
-		<!-- Add logging framework, to produce console output when running in the IDE. -->
-		<!-- These dependencies are excluded from the application JAR by default. -->
-		<dependency>
-			<groupId>org.apache.logging.log4j</groupId>
-			<artifactId>log4j-slf4j-impl</artifactId>
-			<version>${log4j.version}</version>
-			<scope>runtime</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.logging.log4j</groupId>
-			<artifactId>log4j-api</artifactId>
-			<version>${log4j.version}</version>
-			<scope>runtime</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.logging.log4j</groupId>
-			<artifactId>log4j-core</artifactId>
-			<version>${log4j.version}</version>
-			<scope>runtime</scope>
-		</dependency>
-	</dependencies>
-
-	<build>
-		<plugins>
-			<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
-			<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
-				<version>3.1.1</version>
-				<executions>
-					<!-- Run shade goal on package phase -->
-					<execution>
-						<phase>package</phase>
-						<goals>
-							<goal>shade</goal>
-						</goals>
-						<configuration>
-							<createDependencyReducedPom>false</createDependencyReducedPom>
-							<artifactSet>
-								<excludes>
-									<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
-									<exclude>com.google.code.findbugs:jsr305</exclude>
-									<exclude>org.slf4j:*</exclude>
-									<exclude>org.apache.logging.log4j:*</exclude>
-								</excludes>
-							</artifactSet>
-							<filters>
-								<filter>
-									<!-- Do not copy the signatures in the META-INF folder.
-									Otherwise, this might cause SecurityExceptions when using the JAR. -->
-									<artifact>*:*</artifact>
-									<excludes>
-										<exclude>META-INF/*.SF</exclude>
-										<exclude>META-INF/*.DSA</exclude>
-										<exclude>META-INF/*.RSA</exclude>
-									</excludes>
-								</filter>
-							</filters>
-							<transformers>
-								<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
-								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-									<mainClass>${package}.DataStreamJob</mainClass>
-								</transformer>
-							</transformers>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-
-			<!-- Java Compiler -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<version>3.1</version>
-				<configuration>
-					<source>${target.java.version}</source>
-					<target>${target.java.version}</target>
-				</configuration>
-			</plugin>
-
-			<!-- Scala Compiler -->
-			<plugin>
-				<groupId>net.alchim31.maven</groupId>
-				<artifactId>scala-maven-plugin</artifactId>
-				<version>3.2.2</version>
-				<executions>
-					<execution>
-						<goals>
-							<goal>compile</goal>
-							<goal>testCompile</goal>
-						</goals>
-					</execution>
-				</executions>
-				<configuration>
-					<args>
-						<arg>-nobootcp</arg>
-						<arg>-target:jvm-${target.java.version}</arg>
-					</args>
-				</configuration>
-			</plugin>
-
-			<!-- Eclipse Scala Integration -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-eclipse-plugin</artifactId>
-				<version>2.8</version>
-				<configuration>
-					<downloadSources>true</downloadSources>
-					<projectnatures>
-						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
-						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
-					</projectnatures>
-					<buildcommands>
-						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
-					</buildcommands>
-					<classpathContainers>
-						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
-						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
-					</classpathContainers>
-					<excludes>
-						<exclude>org.scala-lang:scala-library</exclude>
-						<exclude>org.scala-lang:scala-compiler</exclude>
-					</excludes>
-					<sourceIncludes>
-						<sourceInclude>**/*.scala</sourceInclude>
-						<sourceInclude>**/*.java</sourceInclude>
-					</sourceIncludes>
-				</configuration>
-			</plugin>
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>build-helper-maven-plugin</artifactId>
-				<version>1.7</version>
-				<executions>
-					<!-- Add src/main/scala to eclipse build path -->
-					<execution>
-						<id>add-source</id>
-						<phase>generate-sources</phase>
-						<goals>
-							<goal>add-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/main/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-					<!-- Add src/test/scala to eclipse build path -->
-					<execution>
-						<id>add-test-source</id>
-						<phase>generate-test-sources</phase>
-						<goals>
-							<goal>add-test-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/test/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-</project>
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/resources/log4j2.properties b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/resources/log4j2.properties
deleted file mode 100644
index 32c696e757d..00000000000
--- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/resources/log4j2.properties
+++ /dev/null
@@ -1,25 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-rootLogger.level = INFO
-rootLogger.appenderRef.console.ref = ConsoleAppender
-
-appender.console.name = ConsoleAppender
-appender.console.type = CONSOLE
-appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/DataStreamJob.scala b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/DataStreamJob.scala
deleted file mode 100644
index 7130036f4d2..00000000000
--- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/src/main/scala/DataStreamJob.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package ${package}
-
-import org.apache.flink.streaming.api.scala._
-
-/**
- * Skeleton for a Flink DataStream Job.
- *
- * <p>For a tutorial how to write a Flink application, check the
- * tutorials and examples on the <a href="https://flink.apache.org">Flink Website</a>.
- *
- * <p>To package your application into a JAR file for execution, run
- * 'mvn clean package' on the command line.
- *
- * <p>If you change the name of the main class (with the public static void main(String[] args))
- * method, change the respective entry in the POM.xml file (simply search for 'mainClass').
- */
-object DataStreamJob {
-  def main(args: Array[String]) {
-    // Sets up the execution environment, which is the main entry point
-    // to building Flink applications.
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-    /*
-     * Here, you can start creating your execution plan for Flink.
-     *
-     * Start with getting some data from the environment, like
-     * 	env.fromSequence(1, 10);
-     *
-     * then, transform the resulting DataStream<Long> using operations
-     * like
-     * 	.filter()
-     * 	.flatMap()
-     * 	.window()
-     * 	.process()
-     *
-     * and many more.
-     * Have a look at the programming guide:
-     *
-     * https://nightlies.apache.org/flink/flink-docs-stable/
-     *
-     */
-
-    // Execute program, beginning computation.
-    env.execute("Flink Scala API Skeleton")
-  }
-}
diff --git a/flink-quickstart/flink-quickstart-scala/src/test/resources/projects/testArtifact/archetype.properties b/flink-quickstart/flink-quickstart-scala/src/test/resources/projects/testArtifact/archetype.properties
deleted file mode 100644
index bfce4802091..00000000000
--- a/flink-quickstart/flink-quickstart-scala/src/test/resources/projects/testArtifact/archetype.properties
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-groupId=org.apache.flink.archetypetest
-artifactId=testArtifact
-version=0.1
-package=org.apache.flink.archetypetest
diff --git a/flink-quickstart/flink-quickstart-scala/src/test/resources/projects/testArtifact/goal.txt b/flink-quickstart/flink-quickstart-scala/src/test/resources/projects/testArtifact/goal.txt
deleted file mode 100644
index f8808babbbb..00000000000
--- a/flink-quickstart/flink-quickstart-scala/src/test/resources/projects/testArtifact/goal.txt
+++ /dev/null
@@ -1 +0,0 @@
-compile
\ No newline at end of file
diff --git a/flink-quickstart/pom.xml b/flink-quickstart/pom.xml
index a96ac4bd9ae..f844cbbdeda 100644
--- a/flink-quickstart/pom.xml
+++ b/flink-quickstart/pom.xml
@@ -34,7 +34,6 @@ under the License.
 
 	<modules>
 		<module>flink-quickstart-java</module>
-		<module>flink-quickstart-scala</module>
 	</modules>
 	<build>
 		<extensions>
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index f6ffa76b9ef..7a6d4fccf2e 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -56,7 +56,17 @@ import scala.reflect.ClassTag
  * Use [[ExecutionEnvironment#getExecutionEnvironment]] to get the correct environment depending on
  * where the program is executed. If it is run inside an IDE a local environment will be created. If
  * the program is submitted to a cluster a remote execution environment will be created.
+ *
+ * @deprecated
+ *   All Flink Scala APIs are deprecated and will be removed in a future Flink version version. You
+ *   can still build your application in Scala, but you should move to the Java version of either
+ *   the DataStream and/or Table API.
+ * @see
+ *   <a
+ *   href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support">
+ *   FLIP-265 Deprecate and remove Scala API support</a>
  */
+@Deprecated
 @Public
 class ExecutionEnvironment(javaEnv: JavaEnv) {
 
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
index bec7a506077..056c26df1b0 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala
@@ -39,6 +39,15 @@ import language.experimental.macros
  * Use [[org.apache.flink.api.scala.ExecutionEnvironment.getExecutionEnvironment]] to obtain an
  * execution environment. This will either create a local environment or a remote environment,
  * depending on the context where your program is executing.
+ *
+ * @deprecated
+ *   All Flink Scala APIs are deprecated and will be removed in a future Flink version version. You
+ *   can still build your application in Scala, but you should move to the Java version of either
+ *   the DataStream and/or Table API.
+ * @see
+ *   <a
+ *   href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support">
+ *   FLIP-265 Deprecate and remove Scala API support</a>
  */
 package object scala {
   // We have this here so that we always have generated TypeInformationS when
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/scala/ConnectedComponentsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/scala/ConnectedComponentsITCase.java
deleted file mode 100644
index 0d8c59d6dda..00000000000
--- a/flink-tests/src/test/java/org/apache/flink/test/example/scala/ConnectedComponentsITCase.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flink.test.example.scala;
-
-import org.apache.flink.examples.scala.graph.ConnectedComponents;
-import org.apache.flink.test.testdata.ConnectedComponentsData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.io.BufferedReader;
-
-import static org.apache.flink.test.util.TestBaseUtils.getResultReader;
-
-/** Test for {@link ConnectedComponents}. */
-public class ConnectedComponentsITCase extends JavaProgramTestBase {
-
-    private static final long SEED = 0xBADC0FFEEBEEFL;
-
-    private static final int NUM_VERTICES = 1000;
-
-    private static final int NUM_EDGES = 10000;
-
-    private String verticesPath;
-    private String edgesPath;
-    private String resultPath;
-
-    @Override
-    protected void preSubmit() throws Exception {
-        verticesPath =
-                createTempFile(
-                        "vertices.txt",
-                        ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
-        edgesPath =
-                createTempFile(
-                        "edges.txt",
-                        ConnectedComponentsData.getRandomOddEvenEdges(
-                                NUM_EDGES, NUM_VERTICES, SEED));
-        resultPath = getTempFilePath("results");
-    }
-
-    @Override
-    protected void testProgram() throws Exception {
-        ConnectedComponents.main(
-                new String[] {
-                    "--vertices", verticesPath,
-                    "--edges", edgesPath,
-                    "--output", resultPath,
-                    "--iterations", "100"
-                });
-    }
-
-    @Override
-    protected void postSubmit() throws Exception {
-        for (BufferedReader reader : getResultReader(resultPath)) {
-            ConnectedComponentsData.checkOddEvenResult(reader);
-        }
-    }
-}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/scala/EnumTriangleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/scala/EnumTriangleITCase.java
deleted file mode 100644
index b9233181045..00000000000
--- a/flink-tests/src/test/java/org/apache/flink/test/example/scala/EnumTriangleITCase.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flink.test.example.scala;
-
-import org.apache.flink.examples.scala.graph.EnumTriangles;
-import org.apache.flink.test.testdata.EnumTriangleData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import static org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory;
-
-/** Test {@link EnumTriangles}. */
-public class EnumTriangleITCase extends JavaProgramTestBase {
-
-    protected String edgePath;
-    protected String resultPath;
-
-    @Override
-    protected void preSubmit() throws Exception {
-        edgePath = createTempFile("edges", EnumTriangleData.EDGES);
-        resultPath = getTempDirPath("triangles");
-    }
-
-    @Override
-    protected void postSubmit() throws Exception {
-        compareResultsByLinesInMemory(EnumTriangleData.TRIANGLES_BY_ID, resultPath);
-    }
-
-    @Override
-    protected void testProgram() throws Exception {
-        EnumTriangles.main(
-                new String[] {
-                    "--edges", edgePath,
-                    "--output", resultPath
-                });
-    }
-}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/scala/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/scala/PageRankITCase.java
deleted file mode 100644
index bfda329e6ba..00000000000
--- a/flink-tests/src/test/java/org/apache/flink/test/example/scala/PageRankITCase.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flink.test.example.scala;
-
-import org.apache.flink.examples.scala.graph.PageRankBasic;
-import org.apache.flink.test.testdata.PageRankData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.util.FileUtils;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-import static org.apache.flink.test.util.TestBaseUtils.compareKeyValuePairsWithDelta;
-
-/** Test for {@link PageRankBasic}. */
-@RunWith(Parameterized.class)
-public class PageRankITCase extends MultipleProgramsTestBase {
-
-    public PageRankITCase(TestExecutionMode mode) {
-        super(mode);
-    }
-
-    private String verticesPath;
-    private String edgesPath;
-    private String resultPath;
-    private String expected;
-
-    @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
-
-    @Before
-    public void before() throws Exception {
-        File resultFile = tempFolder.newFile();
-        // Delete file because the Scala API does not respect WriteMode set by the configuration
-        resultFile.delete();
-        resultPath = resultFile.toURI().toString();
-
-        File verticesFile = tempFolder.newFile();
-        FileUtils.writeFileUtf8(verticesFile, PageRankData.VERTICES);
-
-        File edgesFile = tempFolder.newFile();
-        FileUtils.writeFileUtf8(edgesFile, PageRankData.EDGES);
-
-        verticesPath = verticesFile.toURI().toString();
-        edgesPath = edgesFile.toURI().toString();
-    }
-
-    @After
-    public void after() throws Exception {
-        compareKeyValuePairsWithDelta(expected, resultPath, " ", 0.01);
-    }
-
-    @Test
-    public void testPageRankWithSmallNumberOfIterations() throws Exception {
-        PageRankBasic.main(
-                new String[] {
-                    "--pages", verticesPath,
-                    "--links", edgesPath,
-                    "--output", resultPath,
-                    "--numPages", PageRankData.NUM_VERTICES + "",
-                    "--iterations", "3"
-                });
-        expected = PageRankData.RANKS_AFTER_3_ITERATIONS;
-    }
-
-    @Test
-    public void testPageRankWithConvergence() throws Exception {
-        // start with a very high number of iteration such that the dynamic convergence criterion
-        // must handle termination
-        PageRankBasic.main(
-                new String[] {
-                    "--pages", verticesPath,
-                    "--links", edgesPath,
-                    "--output", resultPath,
-                    "--numPages", PageRankData.NUM_VERTICES + "",
-                    "--iterations", "1000"
-                });
-        expected = PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
-    }
-}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/scala/TransitiveClosureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/scala/TransitiveClosureITCase.java
deleted file mode 100644
index 1462c4c0a01..00000000000
--- a/flink-tests/src/test/java/org/apache/flink/test/example/scala/TransitiveClosureITCase.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flink.test.example.scala;
-
-import org.apache.flink.examples.scala.graph.TransitiveClosureNaive;
-import org.apache.flink.test.testdata.ConnectedComponentsData;
-import org.apache.flink.test.testdata.TransitiveClosureData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import java.io.BufferedReader;
-
-import static org.apache.flink.test.util.TestBaseUtils.getResultReader;
-
-/** Test for {@link TransitiveClosureNaive}. */
-public class TransitiveClosureITCase extends JavaProgramTestBase {
-
-    private static final long SEED = 0xBADC0FFEEBEEFL;
-
-    private static final int NUM_VERTICES = 100;
-
-    private static final int NUM_EDGES = 500;
-
-    private String edgesPath;
-    private String resultPath;
-
-    @Override
-    protected void preSubmit() throws Exception {
-        edgesPath =
-                createTempFile(
-                        "edges.txt",
-                        ConnectedComponentsData.getRandomOddEvenEdges(
-                                NUM_EDGES, NUM_VERTICES, SEED));
-        resultPath = getTempFilePath("results");
-    }
-
-    @Override
-    protected void testProgram() throws Exception {
-        TransitiveClosureNaive.main(
-                new String[] {
-                    "--edges", edgesPath,
-                    "--output", resultPath,
-                    "--iterations", "5"
-                });
-    }
-
-    @Override
-    protected void postSubmit() throws Exception {
-        for (BufferedReader reader : getResultReader(resultPath)) {
-            TransitiveClosureData.checkOddEvenResult(reader);
-        }
-    }
-}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/scala/WebLogAnalysisITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/scala/WebLogAnalysisITCase.java
deleted file mode 100644
index c9ccd8dacf8..00000000000
--- a/flink-tests/src/test/java/org/apache/flink/test/example/scala/WebLogAnalysisITCase.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flink.test.example.scala;
-
-import org.apache.flink.examples.scala.relational.WebLogAnalysis;
-import org.apache.flink.test.testdata.WebLogAnalysisData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import static org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory;
-
-/** Test for {@link WebLogAnalysis}. */
-public class WebLogAnalysisITCase extends JavaProgramTestBase {
-
-    private String docsPath;
-    private String ranksPath;
-    private String visitsPath;
-    private String resultPath;
-
-    @Override
-    protected void preSubmit() throws Exception {
-        docsPath = createTempFile("docs", WebLogAnalysisData.DOCS);
-        ranksPath = createTempFile("ranks", WebLogAnalysisData.RANKS);
-        visitsPath = createTempFile("visits", WebLogAnalysisData.VISITS);
-        resultPath = getTempDirPath("result");
-    }
-
-    @Override
-    protected void postSubmit() throws Exception {
-        compareResultsByLinesInMemory(WebLogAnalysisData.EXCEPTED_RESULT, resultPath);
-    }
-
-    @Override
-    protected void testProgram() throws Exception {
-        WebLogAnalysis.main(
-                new String[] {
-                    "--documents", docsPath,
-                    "--ranks", ranksPath,
-                    "--visits", visitsPath,
-                    "--output", resultPath
-                });
-    }
-}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/scala/WordCountITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/scala/WordCountITCase.java
deleted file mode 100644
index a45f88fb5bf..00000000000
--- a/flink-tests/src/test/java/org/apache/flink/test/example/scala/WordCountITCase.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.flink.test.example.scala;
-
-import org.apache.flink.examples.scala.wordcount.WordCount;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-
-import static org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory;
-
-/** Test {@link WordCount}. */
-public class WordCountITCase extends JavaProgramTestBase {
-
-    protected String textPath;
-    protected String resultPath;
-
-    @Override
-    protected void preSubmit() throws Exception {
-        textPath = createTempFile("text.txt", WordCountData.TEXT);
-        resultPath = getTempDirPath("result");
-    }
-
-    @Override
-    protected void postSubmit() throws Exception {
-        compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath);
-    }
-
-    @Override
-    protected void testProgram() throws Exception {
-        WordCount.main(
-                new String[] {
-                    "--input", textPath,
-                    "--output", resultPath
-                });
-    }
-}
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/pom.xml b/flink-walkthroughs/flink-walkthrough-datastream-scala/pom.xml
deleted file mode 100644
index 42f964d3ffe..00000000000
--- a/flink-walkthroughs/flink-walkthrough-datastream-scala/pom.xml
+++ /dev/null
@@ -1,37 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-
-	<properties>
-		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-	</properties>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-walkthroughs</artifactId>
-		<version>1.17-SNAPSHOT</version>
-	</parent>
-
-	<artifactId>flink-walkthrough-datastream-scala</artifactId>
-	<packaging>maven-archetype</packaging>
-	<name>Flink : Walkthrough : Datastream Scala</name>
-
-</project>
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/META-INF/maven/archetype-metadata.xml b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
deleted file mode 100644
index e78dc8d2cbb..00000000000
--- a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/META-INF/maven/archetype-metadata.xml
+++ /dev/null
@@ -1,36 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-<archetype-descriptor
-	xmlns="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0"
-	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/plugins/maven-archetype-plugin/archetype-descriptor/1.0.0 http://maven.apache.org/xsd/archetype-descriptor-1.0.0.xsd"
-	name="flink-walkthrough-datastream-scala">
-	<fileSets>
-		<fileSet filtered="true" packaged="true" encoding="UTF-8">
-			<directory>src/main/scala</directory>
-			<includes>
-				<include>**/*.scala</include>
-			</includes>
-		</fileSet>
-		<fileSet encoding="UTF-8">
-			<directory>src/main/resources</directory>
-		</fileSet>
-	</fileSets>
-</archetype-descriptor>
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/pom.xml b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/pom.xml
deleted file mode 100644
index f9a63376562..00000000000
--- a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/pom.xml
+++ /dev/null
@@ -1,249 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-	<modelVersion>4.0.0</modelVersion>
-
-	<groupId>${groupId}</groupId>
-	<artifactId>${artifactId}</artifactId>
-	<version>${version}</version>
-	<packaging>jar</packaging>
-
-	<name>Flink Walkthrough DataStream Scala</name>
-	<url>https://flink.apache.org</url>
-
-	<repositories>
-		<repository>
-			<id>apache.snapshots</id>
-			<name>Apache Development Snapshot Repository</name>
-			<url>https://repository.apache.org/content/repositories/snapshots/</url>
-			<releases>
-				<enabled>false</enabled>
-			</releases>
-			<snapshots>
-				<enabled>true</enabled>
-			</snapshots>
-		</repository>
-	</repositories>
-
-	<properties>
-		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-		<flink.version>@project.version@</flink.version>
-		<scala.binary.version>2.12</scala.binary.version>
-		<scala.version>2.12.7</scala.version>
-		<log4j.version>@log4j.version@</log4j.version>
-	</properties>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-walkthrough-common</artifactId>
-			<version>${flink.version}</version>
-		</dependency>
-
-		<!-- Apache Flink dependencies -->
-		<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
-			<version>${flink.version}</version>
-			<scope>provided</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients</artifactId>
-			<version>${flink.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<!-- Add connector dependencies here. They must be in the default scope (compile). -->
-
-		<!-- Example:
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-kafka</artifactId>
-			<version>${flink.version}</version>
-		</dependency>
-		-->
-
-		<!-- Add logging framework, to produce console output when running in the IDE. -->
-		<!-- These dependencies are excluded from the application JAR by default. -->
-		<dependency>
-			<groupId>org.apache.logging.log4j</groupId>
-			<artifactId>log4j-slf4j-impl</artifactId>
-			<version>${log4j.version}</version>
-			<scope>runtime</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.logging.log4j</groupId>
-			<artifactId>log4j-api</artifactId>
-			<version>${log4j.version}</version>
-			<scope>runtime</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.logging.log4j</groupId>
-			<artifactId>log4j-core</artifactId>
-			<version>${log4j.version}</version>
-			<scope>runtime</scope>
-		</dependency>
-	</dependencies>
-
-	<build>
-		<plugins>
-			<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
-			<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-shade-plugin</artifactId>
-				<version>3.0.0</version>
-				<executions>
-					<!-- Run shade goal on package phase -->
-					<execution>
-						<phase>package</phase>
-						<goals>
-							<goal>shade</goal>
-						</goals>
-						<configuration>
-							<artifactSet>
-								<excludes>
-									<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
-									<exclude>com.google.code.findbugs:jsr305</exclude>
-									<exclude>org.slf4j:*</exclude>
-									<exclude>org.apache.logging.log4j:*</exclude>
-								</excludes>
-							</artifactSet>
-							<filters>
-								<filter>
-									<!-- Do not copy the signatures in the META-INF folder.
-									Otherwise, this might cause SecurityExceptions when using the JAR. -->
-									<artifact>*:*</artifact>
-									<excludes>
-										<exclude>META-INF/*.SF</exclude>
-										<exclude>META-INF/*.DSA</exclude>
-										<exclude>META-INF/*.RSA</exclude>
-									</excludes>
-								</filter>
-							</filters>
-							<transformers>
-								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-									<mainClass>${package}.FraudDetectionJob</mainClass>
-								</transformer>
-							</transformers>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-
-			<!-- Java Compiler -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-compiler-plugin</artifactId>
-				<version>3.1</version>
-				<configuration>
-					<source>1.8</source>
-					<target>1.8</target>
-				</configuration>
-			</plugin>
-
-			<!-- Scala Compiler -->
-			<plugin>
-				<groupId>net.alchim31.maven</groupId>
-				<artifactId>scala-maven-plugin</artifactId>
-				<version>3.2.2</version>
-				<executions>
-					<execution>
-						<goals>
-							<goal>compile</goal>
-							<goal>testCompile</goal>
-						</goals>
-					</execution>
-				</executions>
-				<configuration>
-					<args>
-						<arg>-nobootcp</arg>
-					</args>
-				</configuration>
-			</plugin>
-
-			<!-- Eclipse Scala Integration -->
-			<plugin>
-				<groupId>org.apache.maven.plugins</groupId>
-				<artifactId>maven-eclipse-plugin</artifactId>
-				<version>2.8</version>
-				<configuration>
-					<downloadSources>true</downloadSources>
-					<projectnatures>
-						<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
-						<projectnature>org.eclipse.jdt.core.javanature</projectnature>
-					</projectnatures>
-					<buildcommands>
-						<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
-					</buildcommands>
-					<classpathContainers>
-						<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
-						<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
-					</classpathContainers>
-					<excludes>
-						<exclude>org.scala-lang:scala-library</exclude>
-						<exclude>org.scala-lang:scala-compiler</exclude>
-					</excludes>
-					<sourceIncludes>
-						<sourceInclude>**/*.scala</sourceInclude>
-						<sourceInclude>**/*.java</sourceInclude>
-					</sourceIncludes>
-				</configuration>
-			</plugin>
-			<plugin>
-				<groupId>org.codehaus.mojo</groupId>
-				<artifactId>build-helper-maven-plugin</artifactId>
-				<version>1.7</version>
-				<executions>
-					<!-- Add src/main/scala to eclipse build path -->
-					<execution>
-						<id>add-source</id>
-						<phase>generate-sources</phase>
-						<goals>
-							<goal>add-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/main/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-					<!-- Add src/test/scala to eclipse build path -->
-					<execution>
-						<id>add-test-source</id>
-						<phase>generate-test-sources</phase>
-						<goals>
-							<goal>add-test-source</goal>
-						</goals>
-						<configuration>
-							<sources>
-								<source>src/test/scala</source>
-							</sources>
-						</configuration>
-					</execution>
-				</executions>
-			</plugin>
-		</plugins>
-	</build>
-</project>
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/resources/log4j2.properties b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/resources/log4j2.properties
deleted file mode 100644
index 1723e135fef..00000000000
--- a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/resources/log4j2.properties
+++ /dev/null
@@ -1,28 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-rootLogger.level = WARN
-rootLogger.appenderRef.console.ref = ConsoleAppender
-
-logger.sink.name = org.apache.flink.walkthrough.common.sink.AlertSink
-logger.sink.level = INFO
-
-appender.console.name = ConsoleAppender
-appender.console.type = CONSOLE
-appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetectionJob.scala b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetectionJob.scala
deleted file mode 100644
index 58e46e23bab..00000000000
--- a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetectionJob.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package ${package}
-
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.walkthrough.common.sink.AlertSink
-import org.apache.flink.walkthrough.common.entity.Alert
-import org.apache.flink.walkthrough.common.entity.Transaction
-import org.apache.flink.walkthrough.common.source.TransactionSource
-
-/**
-  * Skeleton code for the DataStream code walkthrough
-  */
-object FraudDetectionJob {
-
-  @throws[Exception]
-  def main(args: Array[String]): Unit = {
-    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
-
-    val transactions: DataStream[Transaction] = env
-      .addSource(new TransactionSource)
-      .name("transactions")
-
-    val alerts: DataStream[Alert] = transactions
-      .keyBy(transaction => transaction.getAccountId)
-      .process(new FraudDetector)
-      .name("fraud-detector")
-
-    alerts
-      .addSink(new AlertSink)
-      .name("send-alerts")
-
-    env.execute("Fraud Detection")
-  }
-}
diff --git a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetector.scala b/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetector.scala
deleted file mode 100644
index 6d7d91d6cd5..00000000000
--- a/flink-walkthroughs/flink-walkthrough-datastream-scala/src/main/resources/archetype-resources/src/main/scala/FraudDetector.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package ${package}
-
-import org.apache.flink.streaming.api.functions.KeyedProcessFunction
-import org.apache.flink.util.Collector
-import org.apache.flink.walkthrough.common.entity.Alert
-import org.apache.flink.walkthrough.common.entity.Transaction
-
-/**
-  * Skeleton code for implementing a fraud detector.
-  */
-object FraudDetector {
-  val SMALL_AMOUNT: Double = 1.00
-  val LARGE_AMOUNT: Double = 500.00
-  val ONE_MINUTE: Long     = 60 * 1000L
-}
-
-@SerialVersionUID(1L)
-class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert] {
-
-  @throws[Exception]
-  def processElement(
-      transaction: Transaction,
-      context: KeyedProcessFunction[Long, Transaction, Alert]#Context,
-      collector: Collector[Alert]): Unit = {
-
-    val alert = new Alert
-    alert.setId(transaction.getAccountId)
-
-    collector.collect(alert)
-  }
-}
diff --git a/flink-walkthroughs/pom.xml b/flink-walkthroughs/pom.xml
index 895732a04fd..d302ff0cbba 100644
--- a/flink-walkthroughs/pom.xml
+++ b/flink-walkthroughs/pom.xml
@@ -34,7 +34,6 @@ under the License.
 	<modules>
 		<module>flink-walkthrough-common</module>
 		<module>flink-walkthrough-datastream-java</module>
-		<module>flink-walkthrough-datastream-scala</module>
 	</modules>
 	<build>
 		<extensions>