You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2022/06/24 14:33:13 UTC

[flink-ml] branch master updated: [FLINK-26828] Add flink-ml-examples module

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git


The following commit(s) were added to refs/heads/master by this push:
     new 62ca2d5  [FLINK-26828] Add flink-ml-examples module
62ca2d5 is described below

commit 62ca2d50b3ddc35f5720e1afccd011cf10f871a4
Author: yunfengzhou-hub <yu...@outlook.com>
AuthorDate: Fri Jun 24 22:33:08 2022 +0800

    [FLINK-26828] Add flink-ml-examples module
    
    This closes #115.
---
 README.md                                          |  10 +-
 docs/content/_index.md                             |   6 +-
 .../{quick-start.md => build-your-own-project.md}  |   9 +-
 docs/content/docs/try-flink-ml/quick-start.md      | 230 +++++----------------
 flink-ml-dist/pom.xml                              |   6 +
 flink-ml-dist/src/main/assemblies/bin.xml          |   1 +
 flink-ml-examples/pom.xml                          | 117 +++++++++++
 .../ml/examples/classification/KnnExample.java     |  89 ++++++++
 .../examples/classification/LinearSVCExample.java  |  75 +++++++
 .../classification/LogisticRegressionExample.java  |  74 +++++++
 .../examples/classification/NaiveBayesExample.java |  77 +++++++
 .../OnlineLogisticRegressionExample.java           | 131 ++++++++++++
 .../ml/examples/clustering/KMeansExample.java      |  66 ++++++
 .../examples/clustering/OnlineKMeansExample.java   | 118 +++++++++++
 .../BinaryClassificationEvaluatorExample.java      |  78 +++++++
 .../ml/examples/feature/BucketizerExample.java     |  76 +++++++
 .../feature/IndexToStringModelExample.java         |  77 +++++++
 .../ml/examples/feature/MinMaxScalerExample.java   |  72 +++++++
 .../ml/examples/feature/OneHotEncoderExample.java  |  64 ++++++
 .../ml/examples/feature/StandardScalerExample.java |  63 ++++++
 .../ml/examples/feature/StringIndexerExample.java  |  87 ++++++++
 .../examples/feature/VectorAssemblerExample.java   |  80 +++++++
 .../regression/LinearRegressionExample.java        |  71 +++++++
 .../ml/examples/util/PeriodicSourceFunction.java   |  61 ++++++
 .../org/apache/flink/ml/examples/ExamplesTest.java | 129 ++++++++++++
 .../ml/feature/minmaxscaler/MinMaxScaler.java      |   2 +-
 pom.xml                                            |   1 +
 27 files changed, 1681 insertions(+), 189 deletions(-)

diff --git a/README.md b/README.md
index d935484..41ea744 100644
--- a/README.md
+++ b/README.md
@@ -6,6 +6,12 @@ build ML pipelines for both training and inference jobs.
 Flink ML is developed under the umbrella of [Apache
 Flink](https://flink.apache.org/).
 
+## <a name="start"></a>Getting Started
+
+You can follow this [quick
+start](https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/)
+guideline to get hands-on experience with Flink ML.
+
 ## <a name="build"></a>Building the Project
 
 Run the `mvn clean package` command.
@@ -17,8 +23,8 @@ that you may have added as dependencies to the application:
 ## <a name="benchmark"></a>Benchmark
 
 Flink ML provides functionalities to benchmark its machine learning algorithms.
-For detailed information, please check the [Benchmark
-Getting Started](./flink-ml-benchmark/README.md).
+For detailed information, please check the [Benchmark Getting
+Started](./flink-ml-benchmark/README.md).
 
 ## <a name="documentation"></a>Documentation
 
diff --git a/docs/content/_index.md b/docs/content/_index.md
index 9b76115..e7afa36 100644
--- a/docs/content/_index.md
+++ b/docs/content/_index.md
@@ -32,9 +32,9 @@ build ML pipelines for both training and inference jobs.
 {{< columns >}}
 ## Try Flink ML
 
-If you’re interested in playing around with Flink ML, check out our [example
-codes]({{< ref "docs/try-flink-ml/quick-start" >}}). It provides a step by
-step introduction to the APIs and guides you through real applications.
+If you’re interested in playing around with Flink ML, check out our [quick
+start]({{< ref "docs/try-flink-ml/quick-start" >}}). It provides a simple
+example to submit and execute a Flink ML job on a Flink cluster.
 
 <--->
 
diff --git a/docs/content/docs/try-flink-ml/quick-start.md b/docs/content/docs/try-flink-ml/build-your-own-project.md
similarity index 97%
copy from docs/content/docs/try-flink-ml/quick-start.md
copy to docs/content/docs/try-flink-ml/build-your-own-project.md
index 0ef5a19..ad289e0 100644
--- a/docs/content/docs/try-flink-ml/quick-start.md
+++ b/docs/content/docs/try-flink-ml/build-your-own-project.md
@@ -1,9 +1,9 @@
 ---
-title: "Quick Start"
-weight: 1
+title: "Building your own Flink ML project"
+weight: 2
 type: docs
 aliases:
-- /try-flink-ml/quick-start.html
+- /try-flink-ml/building-your-own-project.html
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
@@ -24,7 +24,7 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Quick Start
+# Building your own Flink ML project
 
 This document provides a quick introduction to using Flink ML. Readers of this
 document will be guided to create a simple Flink job that trains a Machine
@@ -236,4 +236,3 @@ Vector: [0.0, 0.3]	Cluster ID: 1
 Vector: [9.0, 0.0]	Cluster ID: 0
 ```
 
-<!-- TODO: Add sections like "Next Steps` with guidance to other pages of this document. -->
diff --git a/docs/content/docs/try-flink-ml/quick-start.md b/docs/content/docs/try-flink-ml/quick-start.md
index 0ef5a19..fcbcce5 100644
--- a/docs/content/docs/try-flink-ml/quick-start.md
+++ b/docs/content/docs/try-flink-ml/quick-start.md
@@ -27,213 +27,87 @@ under the License.
 # Quick Start
 
 This document provides a quick introduction to using Flink ML. Readers of this
-document will be guided to create a simple Flink job that trains a Machine
+document will be guided to submit a simple Flink job that trains a Machine
 Learning Model and use it to provide prediction service.
 
-## Maven Setup
+## Prerequisites
 
-In order to use Flink ML in a Maven project, add the following dependencies to
-`pom.xml`.
+### Install Flink
 
-{{< artifact flink-ml-core >}}
+Please make sure Flink 1.15 or higher version has been installed in your local
+environment. You can refer to the [local
+installation](https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/local_installation/)
+instruction on Flink's document website for how to achieve this.
 
-{{< artifact flink-ml-iteration >}}
+### Set Up Flink Environment Variables
 
-{{< artifact flink-ml-lib >}}
+After having installed Flink, please register `$FLINK_HOME` as an environment
+variable into your local environment.
 
-The example code provided in this document requires additional dependencies on
-the Flink Table API. In order to execute the example code successfully, please
-make sure the following dependencies also exist in `pom.xml`.
-
-```xml
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-clients</artifactId>
-  <version>1.15.0</version>
-</dependency>
+```bash
+cd ${path_to_flink}
+export FLINK_HOME=`pwd`
 ```
 
-```xml
-<dependency>
-  <groupId>org.apache.flink</groupId>
-  <artifactId>flink-table-planner-loader</artifactId>
-  <version>1.15.0</version>
-</dependency>
-```
 
-## Flink ML Example
-
-Kmeans is a widely-used clustering algorithm and has been supported by Flink ML.
-The example code below creates a Flink job with Flink ML that initializes and
-trains a Kmeans model, and finally uses it to predict the cluster id of certain
-data points.
-
-```java
-import org.apache.flink.ml.clustering.kmeans.KMeans;
-import org.apache.flink.ml.clustering.kmeans.KMeansModel;
-import org.apache.flink.ml.linalg.DenseVector;
-import org.apache.flink.ml.linalg.Vectors;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
-
-public class QuickStart {
-    public static void main(String[] args) {
-        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
-
-        String featuresCol = "features";
-        String predictionCol = "prediction";
-
-        // Generate train data and predict data as DataStream.
-        DataStream<DenseVector> inputStream = env.fromElements(
-                Vectors.dense(0.0, 0.0),
-                Vectors.dense(0.0, 0.3),
-                Vectors.dense(0.3, 0.0),
-                Vectors.dense(9.0, 0.0),
-                Vectors.dense(9.0, 0.6),
-                Vectors.dense(9.6, 0.0)
-        );
-
-        // Convert data from DataStream to Table, as Flink ML uses Table API.
-        Table input = tEnv.fromDataStream(inputStream).as(featuresCol);
-
-        // Creates a K-means object and initialize its parameters.
-        KMeans kmeans = new KMeans()
-                .setK(2)
-                .setSeed(1L)
-                .setFeaturesCol(featuresCol)
-                .setPredictionCol(predictionCol);
-
-        // Trains the K-means Model.
-        KMeansModel model = kmeans.fit(input);
-
-        // Use the K-means Model for predictions.
-        Table output = model.transform(input)[0];
-
-        // Extracts and displays prediction result.
-        for (CloseableIterator<Row> it = output.execute().collect(); it.hasNext(); ) {
-            Row row = it.next();
-            DenseVector vector = (DenseVector) row.getField(featuresCol);
-            int clusterId = (Integer) row.getField(predictionCol);
-            System.out.println("Vector: " + vector + "\tCluster ID: " + clusterId);
-        }
-    }
-}
-```
+[//]: # (TODO: Add instructions to download binary distribution when release is
+    available)
+### Build Flink ML library
 
-After placing the code above into your Maven project and executing it,
-information like below will be printed out to your terminal window.
+In order to use Flink ML's CLI you need to have the latest binary distribution
+of Flink ML. You can acquire the distribution by building Flink ML's source code
+locally with the following command.
 
-```
-Vector: [0.3, 0.0]	Cluster ID: 1
-Vector: [9.6, 0.0]	Cluster ID: 0
-Vector: [9.0, 0.6]	Cluster ID: 0
-Vector: [0.0, 0.0]	Cluster ID: 1
-Vector: [0.0, 0.3]	Cluster ID: 1
-Vector: [9.0, 0.0]	Cluster ID: 0
+```bash
+cd ${path_to_flink_ml}
+mvn clean package -DskipTests
+cd ./flink-ml-dist/target/flink-ml-*-bin/flink-ml*/
 ```
 
-## Breaking Down The Code
+### Add Flink ML binaries to Flink
 
-### The Execution Environment
+You need to copy Flink ML's binary distribution files to Flink's folder for
+proper initialization. Please run the following command from Flink ML's binary
+distribution's folder.
 
-The first lines set up the `StreamExecutionEnvironment` to execute the Flink ML
-job. You would have been familiar with this concept if you have experience using
-Flink. For the example program in this document, a simple
-`StreamExecutionEnvironment` without specific configurations would be enough. 
-
-Given that Flink ML uses Flink's Table API, a `StreamTableEnvironment` would
-also be necessary for the following program.
-
-```java
-StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+```bash
+cp ./lib/*.jar $FLINK_HOME/lib/
 ```
 
-### Creating Training & Inference Data Table
-
-Then the program creates the Table containing data for the training and
-prediction process of the following Kmeans algorithm. Flink ML operators
-search the names of the columns of the input table for input data, and produce
-prediction results to designated column of the output Table.
+## Run Flink ML example job
 
-```java
-DataStream<DenseVector> inputStream = env.fromElements(
-        Vectors.dense(0.0, 0.0),
-        Vectors.dense(0.0, 0.3),
-        Vectors.dense(0.3, 0.0),
-        Vectors.dense(9.0, 0.0),
-        Vectors.dense(9.0, 0.6),
-        Vectors.dense(9.6, 0.0)
-);
+Please start a Flink standalone cluster in your local environment with the
+following command.
 
-Table input = tEnv.fromDataStream(inputStream).as(featuresCol);
+```bash
+$FLINK_HOME/bin/start-cluster.sh
 ```
 
-### Creating, Configuring, Training & Using Kmeans
+You should be able to navigate to the web UI at
+[localhost:8081](http://localhost:8081/) to view the Flink dashboard and see
+that the cluster is up and running.
 
-Flink ML classes for Kmeans algorithm include `KMeans` and `KMeansModel`.
-`KMeans` implements the training process of Kmeans algorithm based on the
-provided training data, and finally generates a `KMeansModel`.
-`KmeansModel.transform()` method encodes the Transformation logic of this
-algorithm and is used for predictions. 
+Then you may submit Flink ML examples to the cluster as follows.
 
-Both `KMeans` and `KMeansModel` provides getter/setter methods for Kmeans
-algorithm's configuration parameters. The example program explicitly sets the
-following parameters, and other configuration parameters will have their default
-values used.
-
-- `K`, the number of clusters to create
-- `seed`, the random seed to initialize cluster centers
-- `featuresCol`, name of the column containing input feature vectors
-- `predictionCol`, name of the column to output prediction results
-
-When the program invokes `KMeans.fit()` to generate a `KMeansModel`, the
-`KMeansModel` will inherit the `KMeans` object's configuration parameters. Thus
-it is supported to set `KMeansModel`'s parameters directly in `KMeans` object.
-
-```java
-KMeans kmeans = new KMeans()
-        .setK(2)
-        .setSeed(1L)
-        .setFeaturesCol(featuresCol)
-        .setPredictionCol(predictionCol);
+```
+$FLINK_HOME/bin/flink run -c org.apache.flink.ml.examples.clustering.KMeansExample $FLINK_HOME/lib/flink-ml-examples*.jar
+```
 
-KMeansModel model = kmeans.fit(input);
+The command above would submit and execute Flink ML's `KMeansExample` job. There
+are also example jobs for other Flink ML algorithms, and you can find them in
+`flink-ml-examples` module.
 
-Table output = model.transform(input)[0];
+A sample output in your terminal is as follows.
 
 ```
+Features: [9.0, 0.0]    Cluster ID: 1
+Features: [0.3, 0.0]    Cluster ID: 0
+Features: [0.0, 0.3]    Cluster ID: 0
+Features: [9.6, 0.0]    Cluster ID: 1
+Features: [0.0, 0.0]    Cluster ID: 0
+Features: [9.0, 0.6]    Cluster ID: 1
 
-### Collecting Prediction Result
-
-Like all other Flink programs, the codes described in the sections above only
-configures the computation graph of a Flink job, and the program only evaluates
-the computation logic and collects outputs after the `execute()` method is
-invoked. Collected outputs from the output table would be `Row`s in which
-`featuresCol` contains input feature vectors, and `predictionCol` contains
-output prediction results, i.e., cluster IDs.
-
-```java
-for (CloseableIterator<Row> it = output.execute().collect(); it.hasNext(); ) {
-    Row row = it.next();
-    DenseVector vector = (DenseVector) row.getField(featuresCol);
-    int clusterId = (Integer) row.getField(predictionCol);
-    System.out.println("Vector: " + vector + "\tCluster ID: " + clusterId);
-}
 ```
 
-```
-Vector: [0.3, 0.0]	Cluster ID: 1
-Vector: [9.6, 0.0]	Cluster ID: 0
-Vector: [9.0, 0.6]	Cluster ID: 0
-Vector: [0.0, 0.0]	Cluster ID: 1
-Vector: [0.0, 0.3]	Cluster ID: 1
-Vector: [9.0, 0.0]	Cluster ID: 0
-```
+Now you have successfully run a Flink ML job.
 
-<!-- TODO: Add sections like "Next Steps` with guidance to other pages of this document. -->
diff --git a/flink-ml-dist/pom.xml b/flink-ml-dist/pom.xml
index 2cc828b..4ec3166 100644
--- a/flink-ml-dist/pom.xml
+++ b/flink-ml-dist/pom.xml
@@ -40,6 +40,12 @@ under the License.
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-ml-examples</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <!-- Stateful Functions Dependencies -->
 
         <dependency>
diff --git a/flink-ml-dist/src/main/assemblies/bin.xml b/flink-ml-dist/src/main/assemblies/bin.xml
index 3ca8d41..987e27b 100644
--- a/flink-ml-dist/src/main/assemblies/bin.xml
+++ b/flink-ml-dist/src/main/assemblies/bin.xml
@@ -36,6 +36,7 @@ under the License.
             <includes>
                 <include>org.apache.flink:statefun-flink-core</include>
                 <include>org.apache.flink:flink-ml-uber</include>
+                <include>org.apache.flink:flink-ml-examples</include>
             </includes>
         </dependencySet>
     </dependencySets>
diff --git a/flink-ml-examples/pom.xml b/flink-ml-examples/pom.xml
new file mode 100644
index 0000000..b10f524
--- /dev/null
+++ b/flink-ml-examples/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>flink-ml-parent</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>2.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>flink-ml-examples</artifactId>
+    <name>Flink ML : Examples</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-ml-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-ml-iteration</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-ml-lib</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-runtime</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner-loader</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils-junit</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/classification/KnnExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/classification/KnnExample.java
new file mode 100644
index 0000000..9941c30
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/classification/KnnExample.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.classification;
+
+import org.apache.flink.ml.classification.knn.Knn;
+import org.apache.flink.ml.classification.knn.KnnModel;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a Knn model and uses it for classification. */
+public class KnnExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input training and prediction data.
+        DataStream<Row> trainStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(2.0, 3.0), 1.0),
+                        Row.of(Vectors.dense(2.1, 3.1), 1.0),
+                        Row.of(Vectors.dense(200.1, 300.1), 2.0),
+                        Row.of(Vectors.dense(200.2, 300.2), 2.0),
+                        Row.of(Vectors.dense(200.3, 300.3), 2.0),
+                        Row.of(Vectors.dense(200.4, 300.4), 2.0),
+                        Row.of(Vectors.dense(200.4, 300.4), 2.0),
+                        Row.of(Vectors.dense(200.6, 300.6), 2.0),
+                        Row.of(Vectors.dense(2.1, 3.1), 1.0),
+                        Row.of(Vectors.dense(2.1, 3.1), 1.0),
+                        Row.of(Vectors.dense(2.1, 3.1), 1.0),
+                        Row.of(Vectors.dense(2.1, 3.1), 1.0),
+                        Row.of(Vectors.dense(2.3, 3.2), 1.0),
+                        Row.of(Vectors.dense(2.3, 3.2), 1.0),
+                        Row.of(Vectors.dense(2.8, 3.2), 3.0),
+                        Row.of(Vectors.dense(300., 3.2), 4.0),
+                        Row.of(Vectors.dense(2.2, 3.2), 1.0),
+                        Row.of(Vectors.dense(2.4, 3.2), 5.0),
+                        Row.of(Vectors.dense(2.5, 3.2), 5.0),
+                        Row.of(Vectors.dense(2.5, 3.2), 5.0),
+                        Row.of(Vectors.dense(2.1, 3.1), 1.0));
+        Table trainTable = tEnv.fromDataStream(trainStream).as("features", "label");
+
+        DataStream<Row> predictStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(4.0, 4.1), 5.0), Row.of(Vectors.dense(300, 42), 2.0));
+        Table predictTable = tEnv.fromDataStream(predictStream).as("features", "label");
+
+        // Creates a Knn object and initializes its parameters.
+        Knn knn = new Knn().setK(4);
+
+        // Trains the Knn Model.
+        KnnModel knnModel = knn.fit(trainTable);
+
+        // Uses the Knn Model for predictions.
+        Table outputTable = knnModel.transform(predictTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+            DenseVector features = (DenseVector) row.getField(knn.getFeaturesCol());
+            double expectedResult = (Double) row.getField(knn.getLabelCol());
+            double predictionResult = (Double) row.getField(knn.getPredictionCol());
+            System.out.printf(
+                    "Features: %-15s \tExpected Result: %s \tPrediction Result: %s\n",
+                    features, expectedResult, predictionResult);
+        }
+    }
+}
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/classification/LinearSVCExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/classification/LinearSVCExample.java
new file mode 100644
index 0000000..f4941fb
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/classification/LinearSVCExample.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.classification;
+
+import org.apache.flink.ml.classification.linearsvc.LinearSVC;
+import org.apache.flink.ml.classification.linearsvc.LinearSVCModel;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a LinearSVC model and uses it for classification. */
+public class LinearSVCExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<Row> inputStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(1, 2, 3, 4), 0., 1.),
+                        Row.of(Vectors.dense(2, 2, 3, 4), 0., 2.),
+                        Row.of(Vectors.dense(3, 2, 3, 4), 0., 3.),
+                        Row.of(Vectors.dense(4, 2, 3, 4), 0., 4.),
+                        Row.of(Vectors.dense(5, 2, 3, 4), 0., 5.),
+                        Row.of(Vectors.dense(11, 2, 3, 4), 1., 1.),
+                        Row.of(Vectors.dense(12, 2, 3, 4), 1., 2.),
+                        Row.of(Vectors.dense(13, 2, 3, 4), 1., 3.),
+                        Row.of(Vectors.dense(14, 2, 3, 4), 1., 4.),
+                        Row.of(Vectors.dense(15, 2, 3, 4), 1., 5.));
+        Table inputTable = tEnv.fromDataStream(inputStream).as("features", "label", "weight");
+
+        // Creates a LinearSVC object and initializes its parameters.
+        LinearSVC linearSVC = new LinearSVC().setWeightCol("weight");
+
+        // Trains the LinearSVC Model.
+        LinearSVCModel linearSVCModel = linearSVC.fit(inputTable);
+
+        // Uses the LinearSVC Model for predictions.
+        Table outputTable = linearSVCModel.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+            DenseVector features = (DenseVector) row.getField(linearSVC.getFeaturesCol());
+            double expectedResult = (Double) row.getField(linearSVC.getLabelCol());
+            double predictionResult = (Double) row.getField(linearSVC.getPredictionCol());
+            DenseVector rawPredictionResult =
+                    (DenseVector) row.getField(linearSVC.getRawPredictionCol());
+            System.out.printf(
+                    "Features: %-25s \tExpected Result: %s \tPrediction Result: %s \tRaw Prediction Result: %s\n",
+                    features, expectedResult, predictionResult, rawPredictionResult);
+        }
+    }
+}
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/classification/LogisticRegressionExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/classification/LogisticRegressionExample.java
new file mode 100644
index 0000000..fc2168d
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/classification/LogisticRegressionExample.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.classification;
+
+import org.apache.flink.ml.classification.logisticregression.LogisticRegression;
+import org.apache.flink.ml.classification.logisticregression.LogisticRegressionModel;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a LogisticRegression model and uses it for classification. */
+public class LogisticRegressionExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<Row> inputStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(1, 2, 3, 4), 0., 1.),
+                        Row.of(Vectors.dense(2, 2, 3, 4), 0., 2.),
+                        Row.of(Vectors.dense(3, 2, 3, 4), 0., 3.),
+                        Row.of(Vectors.dense(4, 2, 3, 4), 0., 4.),
+                        Row.of(Vectors.dense(5, 2, 3, 4), 0., 5.),
+                        Row.of(Vectors.dense(11, 2, 3, 4), 1., 1.),
+                        Row.of(Vectors.dense(12, 2, 3, 4), 1., 2.),
+                        Row.of(Vectors.dense(13, 2, 3, 4), 1., 3.),
+                        Row.of(Vectors.dense(14, 2, 3, 4), 1., 4.),
+                        Row.of(Vectors.dense(15, 2, 3, 4), 1., 5.));
+        Table inputTable = tEnv.fromDataStream(inputStream).as("features", "label", "weight");
+
+        // Creates a LogisticRegression object and initializes its parameters.
+        LogisticRegression lr = new LogisticRegression().setWeightCol("weight");
+
+        // Trains the LogisticRegression Model.
+        LogisticRegressionModel lrModel = lr.fit(inputTable);
+
+        // Uses the LogisticRegression Model for predictions.
+        Table outputTable = lrModel.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+            DenseVector features = (DenseVector) row.getField(lr.getFeaturesCol());
+            double expectedResult = (Double) row.getField(lr.getLabelCol());
+            double predictionResult = (Double) row.getField(lr.getPredictionCol());
+            DenseVector rawPredictionResult = (DenseVector) row.getField(lr.getRawPredictionCol());
+            System.out.printf(
+                    "Features: %-25s \tExpected Result: %s \tPrediction Result: %s \tRaw Prediction Result: %s\n",
+                    features, expectedResult, predictionResult, rawPredictionResult);
+        }
+    }
+}
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/classification/NaiveBayesExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/classification/NaiveBayesExample.java
new file mode 100644
index 0000000..1f36920
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/classification/NaiveBayesExample.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.classification;
+
+import org.apache.flink.ml.classification.naivebayes.NaiveBayes;
+import org.apache.flink.ml.classification.naivebayes.NaiveBayesModel;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a NaiveBayes model and uses it for classification. */
+public class NaiveBayesExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input training and prediction data.
+        DataStream<Row> trainStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(0, 0.), 11),
+                        Row.of(Vectors.dense(1, 0), 10),
+                        Row.of(Vectors.dense(1, 1.), 10));
+        Table trainTable = tEnv.fromDataStream(trainStream).as("features", "label");
+
+        DataStream<Row> predictStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(0, 1.)),
+                        Row.of(Vectors.dense(0, 0.)),
+                        Row.of(Vectors.dense(1, 0)),
+                        Row.of(Vectors.dense(1, 1.)));
+        Table predictTable = tEnv.fromDataStream(predictStream).as("features");
+
+        // Creates a NaiveBayes object and initializes its parameters.
+        NaiveBayes naiveBayes =
+                new NaiveBayes()
+                        .setSmoothing(1.0)
+                        .setFeaturesCol("features")
+                        .setLabelCol("label")
+                        .setPredictionCol("prediction")
+                        .setModelType("multinomial");
+
+        // Trains the NaiveBayes Model.
+        NaiveBayesModel naiveBayesModel = naiveBayes.fit(trainTable);
+
+        // Uses the NaiveBayes Model for predictions.
+        Table outputTable = naiveBayesModel.transform(predictTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+            DenseVector features = (DenseVector) row.getField(naiveBayes.getFeaturesCol());
+            double predictionResult = (Double) row.getField(naiveBayes.getPredictionCol());
+            System.out.printf("Features: %s \tPrediction Result: %s\n", features, predictionResult);
+        }
+    }
+}
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/classification/OnlineLogisticRegressionExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/classification/OnlineLogisticRegressionExample.java
new file mode 100644
index 0000000..d4e7b2f
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/classification/OnlineLogisticRegressionExample.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.classification;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.classification.logisticregression.OnlineLogisticRegression;
+import org.apache.flink.ml.classification.logisticregression.OnlineLogisticRegressionModel;
+import org.apache.flink.ml.examples.util.PeriodicSourceFunction;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/** Simple program that trains an OnlineLogisticRegression model and uses it for classification. */
+public class OnlineLogisticRegressionExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(4);
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input training and prediction data. Both are infinite streams that periodically
+        // sends out provided data to trigger model update and prediction.
+        List<Row> trainData1 =
+                Arrays.asList(
+                        Row.of(Vectors.dense(0.1, 2.), 0.),
+                        Row.of(Vectors.dense(0.2, 2.), 0.),
+                        Row.of(Vectors.dense(0.3, 2.), 0.),
+                        Row.of(Vectors.dense(0.4, 2.), 0.),
+                        Row.of(Vectors.dense(0.5, 2.), 0.),
+                        Row.of(Vectors.dense(11., 12.), 1.),
+                        Row.of(Vectors.dense(12., 11.), 1.),
+                        Row.of(Vectors.dense(13., 12.), 1.),
+                        Row.of(Vectors.dense(14., 12.), 1.),
+                        Row.of(Vectors.dense(15., 12.), 1.));
+
+        List<Row> trainData2 =
+                Arrays.asList(
+                        Row.of(Vectors.dense(0.2, 3.), 0.),
+                        Row.of(Vectors.dense(0.8, 1.), 0.),
+                        Row.of(Vectors.dense(0.7, 1.), 0.),
+                        Row.of(Vectors.dense(0.6, 2.), 0.),
+                        Row.of(Vectors.dense(0.2, 2.), 0.),
+                        Row.of(Vectors.dense(14., 17.), 1.),
+                        Row.of(Vectors.dense(15., 10.), 1.),
+                        Row.of(Vectors.dense(16., 16.), 1.),
+                        Row.of(Vectors.dense(17., 10.), 1.),
+                        Row.of(Vectors.dense(18., 13.), 1.));
+
+        List<Row> predictData =
+                Arrays.asList(
+                        Row.of(Vectors.dense(0.8, 2.7), 0.0),
+                        Row.of(Vectors.dense(15.5, 11.2), 1.0));
+
+        RowTypeInfo typeInfo =
+                new RowTypeInfo(
+                        new TypeInformation[] {DenseVectorTypeInfo.INSTANCE, Types.DOUBLE},
+                        new String[] {"features", "label"});
+
+        SourceFunction<Row> trainSource =
+                new PeriodicSourceFunction(1000, Arrays.asList(trainData1, trainData2));
+        DataStream<Row> trainStream = env.addSource(trainSource, typeInfo);
+        Table trainTable = tEnv.fromDataStream(trainStream).as("features");
+
+        SourceFunction<Row> predictSource =
+                new PeriodicSourceFunction(1000, Collections.singletonList(predictData));
+        DataStream<Row> predictStream = env.addSource(predictSource, typeInfo);
+        Table predictTable = tEnv.fromDataStream(predictStream).as("features");
+
+        // Creates an online LogisticRegression object and initializes its parameters and initial
+        // model data.
+        Row initModelData = Row.of(Vectors.dense(0.41233679404769874, -0.18088118293232122), 0L);
+        Table initModelDataTable = tEnv.fromDataStream(env.fromElements(initModelData));
+        OnlineLogisticRegression olr =
+                new OnlineLogisticRegression()
+                        .setFeaturesCol("features")
+                        .setLabelCol("label")
+                        .setPredictionCol("prediction")
+                        .setReg(0.2)
+                        .setElasticNet(0.5)
+                        .setGlobalBatchSize(10)
+                        .setInitialModelData(initModelDataTable);
+
+        // Trains the online LogisticRegression Model.
+        OnlineLogisticRegressionModel onlineModel = olr.fit(trainTable);
+
+        // Uses the online LogisticRegression Model for predictions.
+        Table outputTable = onlineModel.transform(predictTable)[0];
+
+        // Extracts and displays the results. As training data stream continuously triggers the
+        // update of the internal model data, raw prediction results of the same predict dataset
+        // would change over time.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+            DenseVector features = (DenseVector) row.getField(olr.getFeaturesCol());
+            Double expectedResult = (Double) row.getField(olr.getLabelCol());
+            Double predictionResult = (Double) row.getField(olr.getPredictionCol());
+            DenseVector rawPredictionResult = (DenseVector) row.getField(olr.getRawPredictionCol());
+            System.out.printf(
+                    "Features: %-25s \tExpected Result: %s \tPrediction Result: %s \tRaw Prediction Result: %s\n",
+                    features, expectedResult, predictionResult, rawPredictionResult);
+        }
+    }
+}
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/clustering/KMeansExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/clustering/KMeansExample.java
new file mode 100644
index 0000000..62edf2e
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/clustering/KMeansExample.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.clustering;
+
+import org.apache.flink.ml.clustering.kmeans.KMeans;
+import org.apache.flink.ml.clustering.kmeans.KMeansModel;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a KMeans model and uses it for clustering. */
+public class KMeansExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<DenseVector> inputStream =
+                env.fromElements(
+                        Vectors.dense(0.0, 0.0),
+                        Vectors.dense(0.0, 0.3),
+                        Vectors.dense(0.3, 0.0),
+                        Vectors.dense(9.0, 0.0),
+                        Vectors.dense(9.0, 0.6),
+                        Vectors.dense(9.6, 0.0));
+        Table inputTable = tEnv.fromDataStream(inputStream).as("features");
+
+        // Creates a K-means object and initializes its parameters.
+        KMeans kmeans = new KMeans().setK(2).setSeed(1L);
+
+        // Trains the K-means Model.
+        KMeansModel kmeansModel = kmeans.fit(inputTable);
+
+        // Uses the K-means Model for predictions.
+        Table outputTable = kmeansModel.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+            DenseVector features = (DenseVector) row.getField(kmeans.getFeaturesCol());
+            int clusterId = (Integer) row.getField(kmeans.getPredictionCol());
+            System.out.printf("Features: %s \tCluster ID: %s\n", features, clusterId);
+        }
+    }
+}
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/clustering/OnlineKMeansExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/clustering/OnlineKMeansExample.java
new file mode 100644
index 0000000..b7b7eb3
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/clustering/OnlineKMeansExample.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.clustering;
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.clustering.kmeans.KMeansModelData;
+import org.apache.flink.ml.clustering.kmeans.OnlineKMeans;
+import org.apache.flink.ml.clustering.kmeans.OnlineKMeansModel;
+import org.apache.flink.ml.examples.util.PeriodicSourceFunction;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/** Simple program that trains an OnlineKMeans model and uses it for clustering. */
+public class OnlineKMeansExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(4);
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input training and prediction data. Both are infinite streams that periodically
+        // sends out provided data to trigger model update and prediction.
+        List<Row> trainData1 =
+                Arrays.asList(
+                        Row.of(Vectors.dense(0.0, 0.0)),
+                        Row.of(Vectors.dense(0.0, 0.3)),
+                        Row.of(Vectors.dense(0.3, 0.0)),
+                        Row.of(Vectors.dense(9.0, 0.0)),
+                        Row.of(Vectors.dense(9.0, 0.6)),
+                        Row.of(Vectors.dense(9.6, 0.0)));
+
+        List<Row> trainData2 =
+                Arrays.asList(
+                        Row.of(Vectors.dense(10.0, 100.0)),
+                        Row.of(Vectors.dense(10.0, 100.3)),
+                        Row.of(Vectors.dense(10.3, 100.0)),
+                        Row.of(Vectors.dense(-10.0, -100.0)),
+                        Row.of(Vectors.dense(-10.0, -100.6)),
+                        Row.of(Vectors.dense(-10.6, -100.0)));
+
+        List<Row> predictData =
+                Arrays.asList(
+                        Row.of(Vectors.dense(10.0, 10.0)), Row.of(Vectors.dense(-10.0, 10.0)));
+
+        SourceFunction<Row> trainSource =
+                new PeriodicSourceFunction(1000, Arrays.asList(trainData1, trainData2));
+        DataStream<Row> trainStream =
+                env.addSource(trainSource, new RowTypeInfo(DenseVectorTypeInfo.INSTANCE));
+        Table trainTable = tEnv.fromDataStream(trainStream).as("features");
+
+        SourceFunction<Row> predictSource =
+                new PeriodicSourceFunction(1000, Collections.singletonList(predictData));
+        DataStream<Row> predictStream =
+                env.addSource(predictSource, new RowTypeInfo(DenseVectorTypeInfo.INSTANCE));
+        Table predictTable = tEnv.fromDataStream(predictStream).as("features");
+
+        // Creates an online K-means object and initializes its parameters and initial model data.
+        OnlineKMeans onlineKMeans =
+                new OnlineKMeans()
+                        .setFeaturesCol("features")
+                        .setPredictionCol("prediction")
+                        .setGlobalBatchSize(6)
+                        .setInitialModelData(
+                                KMeansModelData.generateRandomModelData(tEnv, 2, 2, 0.0, 0));
+
+        // Trains the online K-means Model.
+        OnlineKMeansModel onlineModel = onlineKMeans.fit(trainTable);
+
+        // Uses the online K-means Model for predictions.
+        Table outputTable = onlineModel.transform(predictTable)[0];
+
+        // Extracts and displays the results. As training data stream continuously triggers the
+        // update of the internal k-means model data, clustering results of the same predict dataset
+        // would change over time.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row1 = it.next();
+            DenseVector features1 = (DenseVector) row1.getField(onlineKMeans.getFeaturesCol());
+            Integer clusterId1 = (Integer) row1.getField(onlineKMeans.getPredictionCol());
+            Row row2 = it.next();
+            DenseVector features2 = (DenseVector) row2.getField(onlineKMeans.getFeaturesCol());
+            Integer clusterId2 = (Integer) row2.getField(onlineKMeans.getPredictionCol());
+            if (Objects.equals(clusterId1, clusterId2)) {
+                System.out.printf("%s and %s are now in the same cluster.\n", features1, features2);
+            } else {
+                System.out.printf(
+                        "%s and %s are now in different clusters.\n", features1, features2);
+            }
+        }
+    }
+}
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/evaluation/BinaryClassificationEvaluatorExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/evaluation/BinaryClassificationEvaluatorExample.java
new file mode 100644
index 0000000..72def30
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/evaluation/BinaryClassificationEvaluatorExample.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.evaluation;
+
+import org.apache.flink.ml.evaluation.binaryclassification.BinaryClassificationEvaluator;
+import org.apache.flink.ml.evaluation.binaryclassification.BinaryClassificationEvaluatorParams;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+/**
+ * Simple program that creates a BinaryClassificationEvaluator instance and uses it for evaluation.
+ */
+public class BinaryClassificationEvaluatorExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<Row> inputStream =
+                env.fromElements(
+                        Row.of(1.0, Vectors.dense(0.1, 0.9)),
+                        Row.of(1.0, Vectors.dense(0.2, 0.8)),
+                        Row.of(1.0, Vectors.dense(0.3, 0.7)),
+                        Row.of(0.0, Vectors.dense(0.25, 0.75)),
+                        Row.of(0.0, Vectors.dense(0.4, 0.6)),
+                        Row.of(1.0, Vectors.dense(0.35, 0.65)),
+                        Row.of(1.0, Vectors.dense(0.45, 0.55)),
+                        Row.of(0.0, Vectors.dense(0.6, 0.4)),
+                        Row.of(0.0, Vectors.dense(0.7, 0.3)),
+                        Row.of(1.0, Vectors.dense(0.65, 0.35)),
+                        Row.of(0.0, Vectors.dense(0.8, 0.2)),
+                        Row.of(1.0, Vectors.dense(0.9, 0.1)));
+        Table inputTable = tEnv.fromDataStream(inputStream).as("label", "rawPrediction");
+
+        // Creates a BinaryClassificationEvaluator object and initializes its parameters.
+        BinaryClassificationEvaluator evaluator =
+                new BinaryClassificationEvaluator()
+                        .setMetricsNames(
+                                BinaryClassificationEvaluatorParams.AREA_UNDER_PR,
+                                BinaryClassificationEvaluatorParams.KS,
+                                BinaryClassificationEvaluatorParams.AREA_UNDER_ROC);
+
+        // Uses the BinaryClassificationEvaluator object for evaluations.
+        Table outputTable = evaluator.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        Row evaluationResult = outputTable.execute().collect().next();
+        System.out.printf(
+                "Area under the precision-recall curve: %s\n",
+                evaluationResult.getField(BinaryClassificationEvaluatorParams.AREA_UNDER_PR));
+        System.out.printf(
+                "Area under the receiver operating characteristic curve: %s\n",
+                evaluationResult.getField(BinaryClassificationEvaluatorParams.AREA_UNDER_ROC));
+        System.out.printf(
+                "Kolmogorov-Smirnov value: %s\n",
+                evaluationResult.getField(BinaryClassificationEvaluatorParams.KS));
+    }
+}
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/BucketizerExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/BucketizerExample.java
new file mode 100644
index 0000000..9cd630c
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/BucketizerExample.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.feature;
+
+import org.apache.flink.ml.common.param.HasHandleInvalid;
+import org.apache.flink.ml.feature.bucketizer.Bucketizer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+
+/** Simple program that creates a Bucketizer instance and uses it for feature engineering. */
+public class BucketizerExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<Row> inputStream = env.fromElements(Row.of(-0.5, 0.0, 1.0, 0.0));
+        Table inputTable = tEnv.fromDataStream(inputStream).as("f1", "f2", "f3", "f4");
+
+        // Creates a Bucketizer object and initializes its parameters.
+        Double[][] splitsArray =
+                new Double[][] {
+                    new Double[] {-0.5, 0.0, 0.5},
+                    new Double[] {-1.0, 0.0, 2.0},
+                    new Double[] {Double.NEGATIVE_INFINITY, 10.0, Double.POSITIVE_INFINITY},
+                    new Double[] {Double.NEGATIVE_INFINITY, 1.5, Double.POSITIVE_INFINITY}
+                };
+        Bucketizer bucketizer =
+                new Bucketizer()
+                        .setInputCols("f1", "f2", "f3", "f4")
+                        .setOutputCols("o1", "o2", "o3", "o4")
+                        .setSplitsArray(splitsArray)
+                        .setHandleInvalid(HasHandleInvalid.SKIP_INVALID);
+
+        // Uses the Bucketizer object for feature transformations.
+        Table outputTable = bucketizer.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+
+            double[] inputValues = new double[bucketizer.getInputCols().length];
+            double[] outputValues = new double[bucketizer.getInputCols().length];
+            for (int i = 0; i < inputValues.length; i++) {
+                inputValues[i] = (double) row.getField(bucketizer.getInputCols()[i]);
+                outputValues[i] = (double) row.getField(bucketizer.getOutputCols()[i]);
+            }
+
+            System.out.printf(
+                    "Input Values: %s\tOutput Values: %s\n",
+                    Arrays.toString(inputValues), Arrays.toString(outputValues));
+        }
+    }
+}
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/IndexToStringModelExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/IndexToStringModelExample.java
new file mode 100644
index 0000000..2b035d1
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/IndexToStringModelExample.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.feature;
+
+import org.apache.flink.ml.feature.stringindexer.IndexToStringModel;
+import org.apache.flink.ml.feature.stringindexer.StringIndexerModelData;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+
+/**
+ * Simple program that creates a IndexToStringModelExample instance and uses it for feature
+ * engineering.
+ */
+public class IndexToStringModelExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Creates model data for IndexToStringModel.
+        StringIndexerModelData modelData =
+                new StringIndexerModelData(
+                        new String[][] {{"a", "b", "c", "d"}, {"-1.0", "0.0", "1.0", "2.0"}});
+        Table modelTable = tEnv.fromDataStream(env.fromElements(modelData)).as("stringArrays");
+
+        // Generates input data.
+        DataStream<Row> predictStream = env.fromElements(Row.of(0, 3), Row.of(1, 2));
+        Table predictTable = tEnv.fromDataStream(predictStream).as("inputCol1", "inputCol2");
+
+        // Creates an indexToStringModel object and initializes its parameters.
+        IndexToStringModel indexToStringModel =
+                new IndexToStringModel()
+                        .setInputCols("inputCol1", "inputCol2")
+                        .setOutputCols("outputCol1", "outputCol2")
+                        .setModelData(modelTable);
+
+        // Uses the indexToStringModel object for feature transformations.
+        Table outputTable = indexToStringModel.transform(predictTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+
+            int[] inputValues = new int[indexToStringModel.getInputCols().length];
+            String[] outputValues = new String[indexToStringModel.getInputCols().length];
+            for (int i = 0; i < inputValues.length; i++) {
+                inputValues[i] = (int) row.getField(indexToStringModel.getInputCols()[i]);
+                outputValues[i] = (String) row.getField(indexToStringModel.getOutputCols()[i]);
+            }
+
+            System.out.printf(
+                    "Input Values: %s \tOutput Values: %s\n",
+                    Arrays.toString(inputValues), Arrays.toString(outputValues));
+        }
+    }
+}
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/MinMaxScalerExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/MinMaxScalerExample.java
new file mode 100644
index 0000000..98e908f
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/MinMaxScalerExample.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.feature;
+
+import org.apache.flink.ml.feature.minmaxscaler.MinMaxScaler;
+import org.apache.flink.ml.feature.minmaxscaler.MinMaxScalerModel;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a MinMaxScaler model and uses it for feature engineering. */
+public class MinMaxScalerExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input training and prediction data.
+        DataStream<Row> trainStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(0.0, 3.0)),
+                        Row.of(Vectors.dense(2.1, 0.0)),
+                        Row.of(Vectors.dense(4.1, 5.1)),
+                        Row.of(Vectors.dense(6.1, 8.1)),
+                        Row.of(Vectors.dense(200, 400)));
+        Table trainTable = tEnv.fromDataStream(trainStream).as("input");
+
+        DataStream<Row> predictStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(150.0, 90.0)),
+                        Row.of(Vectors.dense(50.0, 40.0)),
+                        Row.of(Vectors.dense(100.0, 50.0)));
+        Table predictTable = tEnv.fromDataStream(predictStream).as("input");
+
+        // Creates a MinMaxScaler object and initializes its parameters.
+        MinMaxScaler minMaxScaler = new MinMaxScaler();
+
+        // Trains the MinMaxScaler Model.
+        MinMaxScalerModel minMaxScalerModel = minMaxScaler.fit(trainTable);
+
+        // Uses the MinMaxScaler Model for predictions.
+        Table outputTable = minMaxScalerModel.transform(predictTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+            DenseVector inputValue = (DenseVector) row.getField(minMaxScaler.getInputCol());
+            DenseVector outputValue = (DenseVector) row.getField(minMaxScaler.getOutputCol());
+            System.out.printf("Input Value: %-15s\tOutput Value: %s\n", inputValue, outputValue);
+        }
+    }
+}
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/OneHotEncoderExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/OneHotEncoderExample.java
new file mode 100644
index 0000000..1ebc8d9
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/OneHotEncoderExample.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.feature;
+
+import org.apache.flink.ml.feature.onehotencoder.OneHotEncoder;
+import org.apache.flink.ml.feature.onehotencoder.OneHotEncoderModel;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a OneHotEncoder model and uses it for feature engineering. */
+public class OneHotEncoderExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input training and prediction data.
+        DataStream<Row> trainStream =
+                env.fromElements(Row.of(0.0), Row.of(1.0), Row.of(2.0), Row.of(0.0));
+        Table trainTable = tEnv.fromDataStream(trainStream).as("input");
+
+        DataStream<Row> predictStream = env.fromElements(Row.of(0.0), Row.of(1.0), Row.of(2.0));
+        Table predictTable = tEnv.fromDataStream(predictStream).as("input");
+
+        // Creates a OneHotEncoder object and initializes its parameters.
+        OneHotEncoder oneHotEncoder =
+                new OneHotEncoder().setInputCols("input").setOutputCols("output");
+
+        // Trains the OneHotEncoder Model.
+        OneHotEncoderModel model = oneHotEncoder.fit(trainTable);
+
+        // Uses the OneHotEncoder Model for predictions.
+        Table outputTable = model.transform(predictTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+            Double inputValue = (Double) row.getField(oneHotEncoder.getInputCols()[0]);
+            SparseVector outputValue =
+                    (SparseVector) row.getField(oneHotEncoder.getOutputCols()[0]);
+            System.out.printf("Input Value: %s\tOutput Value: %s\n", inputValue, outputValue);
+        }
+    }
+}
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/StandardScalerExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/StandardScalerExample.java
new file mode 100644
index 0000000..571a58a
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/StandardScalerExample.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.feature;
+
+import org.apache.flink.ml.feature.standardscaler.StandardScaler;
+import org.apache.flink.ml.feature.standardscaler.StandardScalerModel;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a StandardScaler model and uses it for feature engineering. */
+public class StandardScalerExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<Row> inputStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(-2.5, 9, 1)),
+                        Row.of(Vectors.dense(1.4, -5, 1)),
+                        Row.of(Vectors.dense(2, -1, -2)));
+        Table inputTable = tEnv.fromDataStream(inputStream).as("input");
+
+        // Creates a StandardScaler object and initializes its parameters.
+        StandardScaler standardScaler = new StandardScaler();
+
+        // Trains the StandardScaler Model.
+        StandardScalerModel model = standardScaler.fit(inputTable);
+
+        // Uses the StandardScaler Model for predictions.
+        Table outputTable = model.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+            DenseVector inputValue = (DenseVector) row.getField(standardScaler.getInputCol());
+            DenseVector outputValue = (DenseVector) row.getField(standardScaler.getOutputCol());
+            System.out.printf("Input Value: %s\tOutput Value: %s\n", inputValue, outputValue);
+        }
+    }
+}
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/StringIndexerExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/StringIndexerExample.java
new file mode 100644
index 0000000..cc8312f
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/StringIndexerExample.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.feature;
+
+import org.apache.flink.ml.feature.stringindexer.StringIndexer;
+import org.apache.flink.ml.feature.stringindexer.StringIndexerModel;
+import org.apache.flink.ml.feature.stringindexer.StringIndexerParams;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+
+/** Simple program that trains a StringIndexer model and uses it for feature engineering. */
+public class StringIndexerExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input training and prediction data.
+        DataStream<Row> trainStream =
+                env.fromElements(
+                        Row.of("a", 1.0),
+                        Row.of("b", 1.0),
+                        Row.of("b", 2.0),
+                        Row.of("c", 0.0),
+                        Row.of("d", 2.0),
+                        Row.of("a", 2.0),
+                        Row.of("b", 2.0),
+                        Row.of("b", -1.0),
+                        Row.of("a", -1.0),
+                        Row.of("c", -1.0));
+        Table trainTable = tEnv.fromDataStream(trainStream).as("inputCol1", "inputCol2");
+
+        DataStream<Row> predictStream =
+                env.fromElements(Row.of("a", 2.0), Row.of("b", 1.0), Row.of("c", 2.0));
+        Table predictTable = tEnv.fromDataStream(predictStream).as("inputCol1", "inputCol2");
+
+        // Creates a StringIndexer object and initializes its parameters.
+        StringIndexer stringIndexer =
+                new StringIndexer()
+                        .setStringOrderType(StringIndexerParams.ALPHABET_ASC_ORDER)
+                        .setInputCols("inputCol1", "inputCol2")
+                        .setOutputCols("outputCol1", "outputCol2");
+
+        // Trains the StringIndexer Model.
+        StringIndexerModel model = stringIndexer.fit(trainTable);
+
+        // Uses the StringIndexer Model for predictions.
+        Table outputTable = model.transform(predictTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+
+            Object[] inputValues = new Object[stringIndexer.getInputCols().length];
+            double[] outputValues = new double[stringIndexer.getInputCols().length];
+            for (int i = 0; i < inputValues.length; i++) {
+                inputValues[i] = row.getField(stringIndexer.getInputCols()[i]);
+                outputValues[i] = (double) row.getField(stringIndexer.getOutputCols()[i]);
+            }
+
+            System.out.printf(
+                    "Input Values: %s \tOutput Values: %s\n",
+                    Arrays.toString(inputValues), Arrays.toString(outputValues));
+        }
+    }
+}
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/VectorAssemblerExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/VectorAssemblerExample.java
new file mode 100644
index 0000000..50e51c2
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/VectorAssemblerExample.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.feature;
+
+import org.apache.flink.ml.feature.vectorassembler.VectorAssembler;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+
+/** Simple program that creates a VectorAssembler instance and uses it for feature engineering. */
+public class VectorAssemblerExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<Row> inputStream =
+                env.fromElements(
+                        Row.of(
+                                Vectors.dense(2.1, 3.1),
+                                1.0,
+                                Vectors.sparse(5, new int[] {3}, new double[] {1.0})),
+                        Row.of(
+                                Vectors.dense(2.1, 3.1),
+                                1.0,
+                                Vectors.sparse(
+                                        5,
+                                        new int[] {4, 2, 3, 1},
+                                        new double[] {4.0, 2.0, 3.0, 1.0})));
+        Table inputTable = tEnv.fromDataStream(inputStream).as("vec", "num", "sparseVec");
+
+        // Creates a VectorAssembler object and initializes its parameters.
+        VectorAssembler vectorAssembler =
+                new VectorAssembler()
+                        .setInputCols("vec", "num", "sparseVec")
+                        .setOutputCol("assembledVec");
+
+        // Uses the VectorAssembler object for feature transformations.
+        Table outputTable = vectorAssembler.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+
+            Object[] inputValues = new Object[vectorAssembler.getInputCols().length];
+            for (int i = 0; i < inputValues.length; i++) {
+                inputValues[i] = row.getField(vectorAssembler.getInputCols()[i]);
+            }
+
+            Vector outputValue = (Vector) row.getField(vectorAssembler.getOutputCol());
+
+            System.out.printf(
+                    "Input Values: %s \tOutput Value: %s\n",
+                    Arrays.toString(inputValues), outputValue);
+        }
+    }
+}
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/regression/LinearRegressionExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/regression/LinearRegressionExample.java
new file mode 100644
index 0000000..bc39f3a
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/regression/LinearRegressionExample.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.regression;
+
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.regression.linearregression.LinearRegression;
+import org.apache.flink.ml.regression.linearregression.LinearRegressionModel;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a LinearRegression model and uses it for regression. */
+public class LinearRegressionExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<Row> inputStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(2, 1), 4.0, 1.0),
+                        Row.of(Vectors.dense(3, 2), 7.0, 1.0),
+                        Row.of(Vectors.dense(4, 3), 10.0, 1.0),
+                        Row.of(Vectors.dense(2, 4), 10.0, 1.0),
+                        Row.of(Vectors.dense(2, 2), 6.0, 1.0),
+                        Row.of(Vectors.dense(4, 3), 10.0, 1.0),
+                        Row.of(Vectors.dense(1, 2), 5.0, 1.0),
+                        Row.of(Vectors.dense(5, 3), 11.0, 1.0));
+        Table inputTable = tEnv.fromDataStream(inputStream).as("features", "label", "weight");
+
+        // Creates a LinearRegression object and initializes its parameters.
+        LinearRegression lr = new LinearRegression().setWeightCol("weight");
+
+        // Trains the LinearRegression Model.
+        LinearRegressionModel lrModel = lr.fit(inputTable);
+
+        // Uses the LinearRegression Model for predictions.
+        Table outputTable = lrModel.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+            DenseVector features = (DenseVector) row.getField(lr.getFeaturesCol());
+            double expectedResult = (Double) row.getField(lr.getLabelCol());
+            double predictionResult = (Double) row.getField(lr.getPredictionCol());
+            System.out.printf(
+                    "Features: %s \tExpected Result: %s \tPrediction Result: %s\n",
+                    features, expectedResult, predictionResult);
+        }
+    }
+}
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/util/PeriodicSourceFunction.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/util/PeriodicSourceFunction.java
new file mode 100644
index 0000000..7d12655
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/util/PeriodicSourceFunction.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.util;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.types.Row;
+
+import java.util.List;
+
+/** A source function that collects provided data periodically at a fixed interval. */
+public class PeriodicSourceFunction implements SourceFunction<Row> {
+    private final long interval;
+
+    private final List<List<Row>> data;
+
+    private int index = 0;
+
+    private boolean isRunning = true;
+
+    /**
+     * @param interval The time interval in milliseconds to collect data into sourceContext.
+     * @param data The data to be collected. Each element is a list of records to be collected
+     *     between two adjacent intervals.
+     */
+    public PeriodicSourceFunction(long interval, List<List<Row>> data) {
+        this.interval = interval;
+        this.data = data;
+    }
+
+    @Override
+    public void run(SourceFunction.SourceContext<Row> sourceContext) throws Exception {
+        while (isRunning) {
+            for (Row data : this.data.get(index)) {
+                sourceContext.collect(data);
+            }
+            Thread.sleep(interval);
+            index = (index + 1) % this.data.size();
+        }
+    }
+
+    @Override
+    public void cancel() {
+        isRunning = false;
+    }
+}
diff --git a/flink-ml-examples/src/test/java/org/apache/flink/ml/examples/ExamplesTest.java b/flink-ml-examples/src/test/java/org/apache/flink/ml/examples/ExamplesTest.java
new file mode 100644
index 0000000..a9ee59a
--- /dev/null
+++ b/flink-ml-examples/src/test/java/org/apache/flink/ml/examples/ExamplesTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples;
+
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.apache.commons.io.output.NullPrintStream;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/** Extracts all example classes in this package and tests their main methods. */
+@RunWith(Parameterized.class)
+public class ExamplesTest extends AbstractTestBase {
+    private final Method mainMethod;
+
+    private PrintStream originalPrintStream;
+
+    @Before
+    public void before() {
+        originalPrintStream = System.out;
+        System.setOut(new NullPrintStream());
+    }
+
+    @After
+    public void after() {
+        System.setOut(originalPrintStream);
+    }
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Object[][] testData() throws IOException, ClassNotFoundException {
+        List<Object[]> testData = new ArrayList<>();
+
+        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+        String packageName = ExamplesTest.class.getPackage().getName();
+        URL rootURL =
+                Objects.requireNonNull(classLoader.getResource(packageName.replace(".", "/")));
+        File rootFile = new File(rootURL.getFile().replace("test-classes", "classes"));
+        List<Class<?>> classes = listClasses(packageName, rootFile);
+
+        for (Class<?> clazz : classes) {
+            testData.add(new Object[] {clazz});
+        }
+
+        return testData.toArray(new Object[0][]);
+    }
+
+    private static List<Class<?>> listClasses(String packageName, File rootFile)
+            throws ClassNotFoundException {
+        List<Class<?>> files = new ArrayList<>();
+        for (File file : Objects.requireNonNull(rootFile.listFiles())) {
+            if (file.isDirectory()) {
+                files.addAll(listClasses(packageName + "." + file.getName(), file));
+            } else if (file.getName().endsWith(".class")) {
+                String fullName = file.getAbsolutePath().replace("/", ".");
+                String className =
+                        fullName.substring(
+                                fullName.indexOf(packageName),
+                                fullName.length() - ".class".length());
+                Class<?> clazz = Class.forName(className);
+                try {
+                    clazz.getMethod("main", String[].class);
+                } catch (NoSuchMethodException e) {
+                    continue;
+                }
+                files.add(clazz);
+            }
+        }
+        return files;
+    }
+
+    public ExamplesTest(Class<?> clazz) throws NoSuchMethodException {
+        mainMethod = clazz.getMethod("main", String[].class);
+    }
+
+    @Test
+    public void test() throws ExecutionException, InterruptedException {
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        Future<String> handler =
+                executor.submit(
+                        new Callable() {
+                            @Override
+                            public String call() throws Exception {
+                                mainMethod.invoke(null, (Object) null);
+                                return null;
+                            }
+                        });
+
+        try {
+            handler.get(5, TimeUnit.SECONDS);
+        } catch (TimeoutException e) {
+            handler.cancel(true);
+        }
+    }
+}
diff --git a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/minmaxscaler/MinMaxScaler.java b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/minmaxscaler/MinMaxScaler.java
index 0ba885b..4e59851 100644
--- a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/minmaxscaler/MinMaxScaler.java
+++ b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/minmaxscaler/MinMaxScaler.java
@@ -52,7 +52,7 @@ import java.util.Map;
 
 /**
  * An Estimator which implements the MinMaxScaler algorithm. This algorithm rescales feature values
- * to a common range [min, max] which defined by user.
+ * to a common range [min, max] defined by user.
  *
  * <blockquote>
  *
diff --git a/pom.xml b/pom.xml
index 116b205..722f8ca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,6 +59,7 @@ under the License.
     <module>flink-ml-uber</module>
     <module>flink-ml-benchmark</module>
     <module>flink-ml-dist</module>
+    <module>flink-ml-examples</module>
   </modules>
 
   <properties>