You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2022/06/30 02:41:42 UTC

[flink-ml] branch master updated: [FLINK-28224] Add document for algorithms introduced before 2.1 release

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git


The following commit(s) were added to refs/heads/master by this push:
     new ee53ada  [FLINK-28224] Add document for algorithms introduced before 2.1 release
ee53ada is described below

commit ee53ada6651719af96f758ca48250902fb726709
Author: yunfengzhou-hub <yu...@outlook.com>
AuthorDate: Thu Jun 30 10:41:39 2022 +0800

    [FLINK-28224] Add document for algorithms introduced before 2.1 release
    
    This closes #124.
---
 .../docs/operators/classification/_index.md        |   2 +-
 docs/content/docs/operators/classification/knn.md  | 173 +++++----
 .../docs/operators/classification/linearsvc.md     | 203 +++++++++++
 .../operators/classification/logisticregression.md | 331 ++++++++++++++----
 .../docs/operators/classification/naivebayes.md    | 147 +++++---
 docs/content/docs/operators/clustering/_index.md   |   2 +-
 docs/content/docs/operators/clustering/kmeans.md   | 301 ++++++++++++----
 .../operators/{clustering => evaluation}/_index.md |   6 +-
 .../evaluation/binaryclassificationevaluator.md    | 198 +++++++++++
 docs/content/docs/operators/feature/bucketizer.md  | 185 ++++++++++
 .../content/docs/operators/feature/minmaxscaler.md | 186 ++++++++++
 .../docs/operators/feature/onehotencoder.md        | 128 ++++---
 .../docs/operators/feature/standardscaler.md       | 164 +++++++++
 .../docs/operators/feature/stringindexer.md        | 388 +++++++++++++++++++++
 .../docs/operators/feature/vectorassembler.md      | 186 ++++++++++
 .../operators/{clustering => regression}/_index.md |   6 +-
 .../docs/operators/regression/linearregression.md  | 194 +++++++++++
 .../feature/IndexToStringModelExample.java         |   2 +-
 .../examples/ml/classification/knn_example.py      |   2 +-
 .../ml/classification/linearsvc_example.py         |   2 +-
 .../classification/logisticregression_example.py   |   2 +-
 .../ml/classification/naivebayes_example.py        |   2 +-
 .../examples/ml/clustering/kmeans_example.py       |   2 +-
 .../binaryclassificationevaluator_example.py       |   2 +-
 .../examples/ml/feature/bucketizer_example.py      |   2 +-
 .../ml/feature/indextostringmodel_example.py       |   4 +-
 .../examples/ml/feature/minmaxscaler_example.py    |   2 +-
 .../examples/ml/feature/onehotencoder_example.py   |   2 +-
 .../examples/ml/feature/standardscaler_example.py  |   2 +-
 .../examples/ml/feature/stringindexer_example.py   |   2 +-
 .../examples/ml/feature/vectorassembler_example.py |   2 +-
 .../ml/regression/linearregression_example.py      |   2 +-
 32 files changed, 2504 insertions(+), 328 deletions(-)

diff --git a/docs/content/docs/operators/classification/_index.md b/docs/content/docs/operators/classification/_index.md
index e6f29c5..007d663 100644
--- a/docs/content/docs/operators/classification/_index.md
+++ b/docs/content/docs/operators/classification/_index.md
@@ -1,7 +1,7 @@
 ---
 title: Classification
 bookCollapseSection: true
-weight: 2
+weight: 1
 aliases:
   - /operators/feature/
 ---
diff --git a/docs/content/docs/operators/classification/knn.md b/docs/content/docs/operators/classification/knn.md
index 289b902..0a980f5 100644
--- a/docs/content/docs/operators/classification/knn.md
+++ b/docs/content/docs/operators/classification/knn.md
@@ -23,33 +23,33 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# KNN
+## KNN
 
 K Nearest Neighbor(KNN) is a classification algorithm. The basic assumption of
-KNN is that if most of the nearest K neighbors of the provided sample belongs to
-the same label, then it is highly probabl that the provided sample also belongs
+KNN is that if most of the nearest K neighbors of the provided sample belong to
+the same label, then it is highly probable that the provided sample also belongs
 to that label.
 
-## Input Columns
+### Input Columns
 
 | Param name  | Type    | Default      | Description      |
 | :---------- | :------ | :----------- | :--------------- |
 | featuresCol | Vector  | `"features"` | Feature vector   |
 | labelCol    | Integer | `"label"`    | Label to predict |
 
-## Output Columns
+### Output Columns
 
 | Param name    | Type    | Default        | Description     |
 | :------------ | :------ | :------------- | :-------------- |
 | predictionCol | Integer | `"prediction"` | Predicted label |
 
-## Parameters
+### Parameters
 
-Below are parameters required by `KnnModel`.
+Below are the parameters required by `KnnModel`.
 
 | Key           | Default        | Type    | Required | Description                      |
-| ------------- | -------------- | ------- | -------- | -------------------------------- |
-| K             | `5`            | Integer | no       | The number of nearest neighbors. |
+|---------------| -------------- | ------- | -------- | -------------------------------- |
+| k             | `5`            | Integer | no       | The number of nearest neighbors. |
 | featuresCol   | `"features"`   | String  | no       | Features column name.            |
 | predictionCol | `"prediction"` | String  | no       | Prediction column name.          |
 
@@ -59,9 +59,9 @@ Below are parameters required by `KnnModel`.
 | -------- | --------- | ------ | -------- | ------------------ |
 | labelCol | `"label"` | String | no       | Label column name. |
 
-## Examples
+### Examples
 
-{{< tabs knn >}}
+{{< tabs examples >}}
 
 {{< tab "Java">}}
 ```java
@@ -69,73 +69,98 @@ import org.apache.flink.ml.classification.knn.Knn;
 import org.apache.flink.ml.classification.knn.KnnModel;
 import org.apache.flink.ml.linalg.DenseVector;
 import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a Knn model and uses it for classification. */
+public class KnnExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input training and prediction data.
+        DataStream<Row> trainStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(2.0, 3.0), 1.0),
+                        Row.of(Vectors.dense(2.1, 3.1), 1.0),
+                        Row.of(Vectors.dense(200.1, 300.1), 2.0),
+                        Row.of(Vectors.dense(200.2, 300.2), 2.0),
+                        Row.of(Vectors.dense(200.3, 300.3), 2.0),
+                        Row.of(Vectors.dense(200.4, 300.4), 2.0),
+                        Row.of(Vectors.dense(200.4, 300.4), 2.0),
+                        Row.of(Vectors.dense(200.6, 300.6), 2.0),
+                        Row.of(Vectors.dense(2.1, 3.1), 1.0),
+                        Row.of(Vectors.dense(2.1, 3.1), 1.0),
+                        Row.of(Vectors.dense(2.1, 3.1), 1.0),
+                        Row.of(Vectors.dense(2.1, 3.1), 1.0),
+                        Row.of(Vectors.dense(2.3, 3.2), 1.0),
+                        Row.of(Vectors.dense(2.3, 3.2), 1.0),
+                        Row.of(Vectors.dense(2.8, 3.2), 3.0),
+                        Row.of(Vectors.dense(300., 3.2), 4.0),
+                        Row.of(Vectors.dense(2.2, 3.2), 1.0),
+                        Row.of(Vectors.dense(2.4, 3.2), 5.0),
+                        Row.of(Vectors.dense(2.5, 3.2), 5.0),
+                        Row.of(Vectors.dense(2.5, 3.2), 5.0),
+                        Row.of(Vectors.dense(2.1, 3.1), 1.0));
+        Table trainTable = tEnv.fromDataStream(trainStream).as("features", "label");
+
+        DataStream<Row> predictStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(4.0, 4.1), 5.0), Row.of(Vectors.dense(300, 42), 2.0));
+        Table predictTable = tEnv.fromDataStream(predictStream).as("features", "label");
+
+        // Creates a Knn object and initializes its parameters.
+        Knn knn = new Knn().setK(4);
+
+        // Trains the Knn Model.
+        KnnModel knnModel = knn.fit(trainTable);
+
+        // Uses the Knn Model for predictions.
+        Table outputTable = knnModel.transform(predictTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+            DenseVector features = (DenseVector) row.getField(knn.getFeaturesCol());
+            double expectedResult = (Double) row.getField(knn.getLabelCol());
+            double predictionResult = (Double) row.getField(knn.getPredictionCol());
+            System.out.printf(
+                    "Features: %-15s \tExpected Result: %s \tPrediction Result: %s\n",
+                    features, expectedResult, predictionResult);
+        }
+    }
+}
 
-List<Row> trainRows =
-  new ArrayList<>(
-  Arrays.asList(
-    Row.of(Vectors.dense(2.0, 3.0), 1.0),
-    Row.of(Vectors.dense(2.1, 3.1), 1.0),
-    Row.of(Vectors.dense(200.1, 300.1), 2.0),
-    Row.of(Vectors.dense(200.2, 300.2), 2.0),
-    Row.of(Vectors.dense(200.3, 300.3), 2.0),
-    Row.of(Vectors.dense(200.4, 300.4), 2.0),
-    Row.of(Vectors.dense(200.4, 300.4), 2.0),
-    Row.of(Vectors.dense(200.6, 300.6), 2.0),
-    Row.of(Vectors.dense(2.1, 3.1), 1.0),
-    Row.of(Vectors.dense(2.1, 3.1), 1.0),
-    Row.of(Vectors.dense(2.1, 3.1), 1.0),
-    Row.of(Vectors.dense(2.1, 3.1), 1.0),
-    Row.of(Vectors.dense(2.3, 3.2), 1.0),
-    Row.of(Vectors.dense(2.3, 3.2), 1.0),
-    Row.of(Vectors.dense(2.8, 3.2), 3.0),
-    Row.of(Vectors.dense(300., 3.2), 4.0),
-    Row.of(Vectors.dense(2.2, 3.2), 1.0),
-    Row.of(Vectors.dense(2.4, 3.2), 5.0),
-    Row.of(Vectors.dense(2.5, 3.2), 5.0),
-    Row.of(Vectors.dense(2.5, 3.2), 5.0),
-    Row.of(Vectors.dense(2.1, 3.1), 1.0)));
-List<Row> predictRows =
-  new ArrayList<>(
-  Arrays.asList(
-    Row.of(Vectors.dense(4.0, 4.1), 5.0),
-    Row.of(Vectors.dense(300, 42), 2.0)));
-Schema schema =
-  Schema.newBuilder()
-  .column("f0", DataTypes.of(DenseVector.class))
-  .column("f1", DataTypes.DOUBLE())
-  .build();
-
-DataStream<Row> dataStream = env.fromCollection(trainRows);
-Table trainData = tEnv.fromDataStream(dataStream, schema).as("features", "label");
-DataStream<Row> predDataStream = env.fromCollection(predictRows);
-Table predictData = tEnv.fromDataStream(predDataStream, schema).as("features", "label");
-
-Knn knn = new Knn();
-KnnModel knnModel = knn.fit(trainData);
-Table output = knnModel.transform(predictData)[0];
-
-output.execute().print();
 ```
 {{< /tab>}}
 
 {{< tab "Python">}}
 ```python
+# Simple program that trains a Knn model and uses it for classification.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to set up Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
 from pyflink.common import Types
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.table import StreamTableEnvironment
-
 from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
 from pyflink.ml.lib.classification.knn import KNN
+from pyflink.table import StreamTableEnvironment
 
 # create a new StreamExecutionEnvironment
 env = StreamExecutionEnvironment.get_execution_environment()
 
-# load flink ml jar
-env.add_jars("file:///{path}/statefun-flink-core-3.1.0.jar", "file:///{path}/flink-ml-uber-{version}.jar")
-
 # create a StreamTableEnvironment
 t_env = StreamTableEnvironment.create(env)
 
+# generate input training and prediction data
 train_data = t_env.from_data_stream(
     env.from_collection([
         (Vectors.dense([2.0, 3.0]), 1.0),
@@ -173,19 +198,25 @@ predict_data = t_env.from_data_stream(
             ['features', 'label'],
             [DenseVectorTypeInfo(), Types.DOUBLE()])))
 
-knn = KNN()
+# create a knn object and initialize its parameters
+knn = KNN().set_k(4)
+
+# train the knn model
 model = knn.fit(train_data)
+
+# use the knn model for predictions
 output = model.transform(predict_data)[0]
-output.execute().print()
-
-# output
-# +----+--------------------------------+--------------------------------+--------------------------------+
-# | op |                       features |                          label |                     prediction |
-# +----+--------------------------------+--------------------------------+--------------------------------+
-# | +I |                     [4.0, 4.1] |                            5.0 |                            5.0 |
-# | +I |                  [300.0, 42.0] |                            2.0 |                            2.0 |
-# +----+--------------------------------+--------------------------------+--------------------------------+
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+    features = result[field_names.index(knn.get_features_col())]
+    expected_result = result[field_names.index(knn.get_label_col())]
+    actual_result = result[field_names.index(knn.get_prediction_col())]
+    print('Features: ' + str(features) + ' \tExpected Result: ' + str(expected_result)
+          + ' \tActual Result: ' + str(actual_result))
 ```
 {{< /tab>}}
+
 {{< /tabs>}}
 
diff --git a/docs/content/docs/operators/classification/linearsvc.md b/docs/content/docs/operators/classification/linearsvc.md
new file mode 100644
index 0000000..e5ddb39
--- /dev/null
+++ b/docs/content/docs/operators/classification/linearsvc.md
@@ -0,0 +1,203 @@
+---
+title: "Linear SVC"
+type: docs
+aliases:
+- /operators/classification/linearsvc.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## Linear Support Vector Machine
+
+Linear Support Vector Machine (Linear SVC) is an algorithm that attempts to find
+a hyperplane to maximize the distance between classified samples.
+
+### Input Columns
+
+| Param name  | Type    | Default      | Description      |
+| :---------- | :------ | :----------- | :--------------- |
+| featuresCol | Vector  | `"features"` | Feature vector   |
+| labelCol    | Integer | `"label"`    | Label to predict |
+| weightCol   | Double  | `"weight"`   | Weight of sample |
+
+### Output Columns
+
+| Param name       | Type    | Default           | Description                             |
+| :--------------- | :------ | :---------------- | :-------------------------------------- |
+| predictionCol    | Integer | `"prediction"`    | Label of the max probability            |
+| rawPredictionCol | Vector  | `"rawPrediction"` | Vector of the probability of each label |
+
+### Parameters
+
+Below are the parameters required by `LinearSVCModel`.
+
+| Key              | Default           | Type   | Required | Description                                                  |
+| ---------------- | ----------------- | ------ | -------- | ------------------------------------------------------------ |
+| featuresCol      | `"features"`      | String | no       | Features column name.                                        |
+| predictionCol    | `"prediction"`    | String | no       | Prediction column name.                                      |
+| rawPredictionCol | `"rawPrediction"` | String | no       | Raw prediction column name.                                  |
+| threshold        | `0.0`             | Double | no       | Threshold in binary classification prediction applied to rawPrediction. |
+
+`LinearSVC` needs parameters above and also below.
+
+| Key             | Default   | Type    | Required | Description                                     |
+| --------------- | --------- | ------- | -------- | ----------------------------------------------- |
+| labelCol        | `"label"` | String  | no       | Label column name.                              |
+| weightCol       | `null`    | String  | no       | Weight column name.                             |
+| maxIter         | `20`      | Integer | no       | Maximum number of iterations.                   |
+| reg             | `0.`      | Double  | no       | Regularization parameter.                       |
+| elasticNet      | `0.`      | Double  | no       | ElasticNet parameter.                           |
+| learningRate    | `0.1`     | Double  | no       | Learning rate of optimization method.           |
+| globalBatchSize | `32`      | Integer | no       | Global batch size of training algorithms.       |
+| tol             | `1e-6`    | Double  | no       | Convergence tolerance for iterative algorithms. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.classification.linearsvc.LinearSVC;
+import org.apache.flink.ml.classification.linearsvc.LinearSVCModel;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a LinearSVC model and uses it for classification. */
+public class LinearSVCExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<Row> inputStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(1, 2, 3, 4), 0., 1.),
+                        Row.of(Vectors.dense(2, 2, 3, 4), 0., 2.),
+                        Row.of(Vectors.dense(3, 2, 3, 4), 0., 3.),
+                        Row.of(Vectors.dense(4, 2, 3, 4), 0., 4.),
+                        Row.of(Vectors.dense(5, 2, 3, 4), 0., 5.),
+                        Row.of(Vectors.dense(11, 2, 3, 4), 1., 1.),
+                        Row.of(Vectors.dense(12, 2, 3, 4), 1., 2.),
+                        Row.of(Vectors.dense(13, 2, 3, 4), 1., 3.),
+                        Row.of(Vectors.dense(14, 2, 3, 4), 1., 4.),
+                        Row.of(Vectors.dense(15, 2, 3, 4), 1., 5.));
+        Table inputTable = tEnv.fromDataStream(inputStream).as("features", "label", "weight");
+
+        // Creates a LinearSVC object and initializes its parameters.
+        LinearSVC linearSVC = new LinearSVC().setWeightCol("weight");
+
+        // Trains the LinearSVC Model.
+        LinearSVCModel linearSVCModel = linearSVC.fit(inputTable);
+
+        // Uses the LinearSVC Model for predictions.
+        Table outputTable = linearSVCModel.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+            DenseVector features = (DenseVector) row.getField(linearSVC.getFeaturesCol());
+            double expectedResult = (Double) row.getField(linearSVC.getLabelCol());
+            double predictionResult = (Double) row.getField(linearSVC.getPredictionCol());
+            DenseVector rawPredictionResult =
+                    (DenseVector) row.getField(linearSVC.getRawPredictionCol());
+            System.out.printf(
+                    "Features: %-25s \tExpected Result: %s \tPrediction Result: %s \tRaw Prediction Result: %s\n",
+                    features, expectedResult, predictionResult, rawPredictionResult);
+        }
+    }
+}
+
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that trains a LinearSVC model and uses it for classification.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to set up Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.classification.linearsvc import LinearSVC
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_table = t_env.from_data_stream(
+    env.from_collection([
+        (Vectors.dense([1, 2, 3, 4]), 0., 1.),
+        (Vectors.dense([2, 2, 3, 4]), 0., 2.),
+        (Vectors.dense([3, 2, 3, 4]), 0., 3.),
+        (Vectors.dense([4, 2, 3, 4]), 0., 4.),
+        (Vectors.dense([5, 2, 3, 4]), 0., 5.),
+        (Vectors.dense([11, 2, 3, 4]), 1., 1.),
+        (Vectors.dense([12, 2, 3, 4]), 1., 2.),
+        (Vectors.dense([13, 2, 3, 4]), 1., 3.),
+        (Vectors.dense([14, 2, 3, 4]), 1., 4.),
+        (Vectors.dense([15, 2, 3, 4]), 1., 5.),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['features', 'label', 'weight'],
+            [DenseVectorTypeInfo(), Types.DOUBLE(), Types.DOUBLE()])
+    ))
+
+# create a linear svc object and initialize its parameters
+linear_svc = LinearSVC().set_weight_col('weight')
+
+# train the linear svc model
+model = linear_svc.fit(input_table)
+
+# use the linear svc model for predictions
+output = model.transform(input_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+    features = result[field_names.index(linear_svc.get_features_col())]
+    expected_result = result[field_names.index(linear_svc.get_label_col())]
+    prediction_result = result[field_names.index(linear_svc.get_prediction_col())]
+    raw_prediction_result = result[field_names.index(linear_svc.get_raw_prediction_col())]
+    print('Features: ' + str(features) + ' \tExpected Result: ' + str(expected_result)
+          + ' \tPrediction Result: ' + str(prediction_result)
+          + ' \tRaw Prediction Result: ' + str(raw_prediction_result))
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/docs/content/docs/operators/classification/logisticregression.md b/docs/content/docs/operators/classification/logisticregression.md
index 5a8b119..298e1fa 100644
--- a/docs/content/docs/operators/classification/logisticregression.md
+++ b/docs/content/docs/operators/classification/logisticregression.md
@@ -23,12 +23,12 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Logistic Regression
+## Logistic Regression
 
-Logistic regression is a special case of Generalized Linear Model. It is widely
-used to predict a binary response. 
+Logistic regression is a special case of the Generalized Linear Model. It is
+widely used to predict a binary response. 
 
-## Input Columns
+### Input Columns
 
 | Param name  | Type    | Default      | Description      |
 | :---------- | :------ | :----------- | :--------------- |
@@ -36,16 +36,16 @@ used to predict a binary response.
 | labelCol    | Integer | `"label"`    | Label to predict |
 | weightCol   | Double  | `"weight"`   | Weight of sample |
 
-## Output Columns
+### Output Columns
 
 | Param name       | Type    | Default           | Description                             |
 | :--------------- | :------ | :---------------- | :-------------------------------------- |
 | predictionCol    | Integer | `"prediction"`    | Label of the max probability            |
 | rawPredictionCol | Vector  | `"rawPrediction"` | Vector of the probability of each label |
 
-## Parameters
+### Parameters
 
-Below are parameters required by `LogisticRegressionModel`.
+Below are the parameters required by `LogisticRegressionModel`.
 
 | Key              | Default           | Type   | Required | Description                 |
 | ---------------- | ----------------- | ------ | -------- | --------------------------- |
@@ -61,73 +61,100 @@ Below are parameters required by `LogisticRegressionModel`.
 | weightCol       | `null`    | String  | no       | Weight column name.                                          |
 | maxIter         | `20`      | Integer | no       | Maximum number of iterations.                                |
 | reg             | `0.`      | Double  | no       | Regularization parameter.                                    |
+| elasticNet      | `0.`      | Double  | no       | ElasticNet parameter.                                        |
 | learningRate    | `0.1`     | Double  | no       | Learning rate of optimization method.                        |
 | globalBatchSize | `32`      | Integer | no       | Global batch size of training algorithms.                    |
 | tol             | `1e-6`    | Double  | no       | Convergence tolerance for iterative algorithms.              |
 | multiClass      | `"auto"`  | String  | no       | Classification type. Supported values: "auto", "binomial", "multinomial" |
 
-## Examples
-{{< tabs logisticregression >}}
+### Examples
+{{< tabs examples >}}
 
 {{< tab "Java">}}
 ```java
 import org.apache.flink.ml.classification.logisticregression.LogisticRegression;
+import org.apache.flink.ml.classification.logisticregression.LogisticRegressionModel;
 import org.apache.flink.ml.linalg.DenseVector;
 import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a LogisticRegression model and uses it for classification. */
+public class LogisticRegressionExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<Row> inputStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(1, 2, 3, 4), 0., 1.),
+                        Row.of(Vectors.dense(2, 2, 3, 4), 0., 2.),
+                        Row.of(Vectors.dense(3, 2, 3, 4), 0., 3.),
+                        Row.of(Vectors.dense(4, 2, 3, 4), 0., 4.),
+                        Row.of(Vectors.dense(5, 2, 3, 4), 0., 5.),
+                        Row.of(Vectors.dense(11, 2, 3, 4), 1., 1.),
+                        Row.of(Vectors.dense(12, 2, 3, 4), 1., 2.),
+                        Row.of(Vectors.dense(13, 2, 3, 4), 1., 3.),
+                        Row.of(Vectors.dense(14, 2, 3, 4), 1., 4.),
+                        Row.of(Vectors.dense(15, 2, 3, 4), 1., 5.));
+        Table inputTable = tEnv.fromDataStream(inputStream).as("features", "label", "weight");
+
+        // Creates a LogisticRegression object and initializes its parameters.
+        LogisticRegression lr = new LogisticRegression().setWeightCol("weight");
+
+        // Trains the LogisticRegression Model.
+        LogisticRegressionModel lrModel = lr.fit(inputTable);
+
+        // Uses the LogisticRegression Model for predictions.
+        Table outputTable = lrModel.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+            DenseVector features = (DenseVector) row.getField(lr.getFeaturesCol());
+            double expectedResult = (Double) row.getField(lr.getLabelCol());
+            double predictionResult = (Double) row.getField(lr.getPredictionCol());
+            DenseVector rawPredictionResult = (DenseVector) row.getField(lr.getRawPredictionCol());
+            System.out.printf(
+                    "Features: %-25s \tExpected Result: %s \tPrediction Result: %s \tRaw Prediction Result: %s\n",
+                    features, expectedResult, predictionResult, rawPredictionResult);
+        }
+    }
+}
 
-List<Row> binomialTrainData =
-  Arrays.asList(
-  Row.of(Vectors.dense(1, 2, 3, 4), 0., 1.),
-  Row.of(Vectors.dense(2, 2, 3, 4), 0., 2.),
-  Row.of(Vectors.dense(3, 2, 3, 4), 0., 3.),
-  Row.of(Vectors.dense(4, 2, 3, 4), 0., 4.),
-  Row.of(Vectors.dense(5, 2, 3, 4), 0., 5.),
-  Row.of(Vectors.dense(11, 2, 3, 4), 1., 1.),
-  Row.of(Vectors.dense(12, 2, 3, 4), 1., 2.),
-  Row.of(Vectors.dense(13, 2, 3, 4), 1., 3.),
-  Row.of(Vectors.dense(14, 2, 3, 4), 1., 4.),
-  Row.of(Vectors.dense(15, 2, 3, 4), 1., 5.));
-Collections.shuffle(binomialTrainData);
-
-Table binomialDataTable =
-  tEnv.fromDataStream(
-  env.fromCollection(
-    binomialTrainData,
-    new RowTypeInfo(
-      new TypeInformation[] {
-        TypeInformation.of(DenseVector.class),
-        Types.DOUBLE,
-        Types.DOUBLE
-      },
-      new String[] {"features", "label", "weight"})));
-
-LogisticRegression logisticRegression = new LogisticRegression().setWeightCol("weight");
-LogisticRegressionModel model = logisticRegression.fit(binomialDataTable);
-Table output = model.transform(binomialDataTable)[0];
-
-output.execute().print();
 ```
 {{< /tab>}}
 
 {{< tab "Python">}}
 ```python
+# Simple program that trains a LogisticRegression model and uses it for
+# classification.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to set up Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
 from pyflink.common import Types
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.table import StreamTableEnvironment
-
 from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
 from pyflink.ml.lib.classification.logisticregression import LogisticRegression
+from pyflink.table import StreamTableEnvironment
 
 # create a new StreamExecutionEnvironment
 env = StreamExecutionEnvironment.get_execution_environment()
 
-# load flink ml jar
-env.add_jars("file:///{path}/statefun-flink-core-3.1.0.jar", "file:///{path}/flink-ml-uber-{version}.jar")
-
 # create a StreamTableEnvironment
 t_env = StreamTableEnvironment.create(env)
 
-binomial_data_table = t_env.from_data_stream(
+# generate input data
+input_data = t_env.from_data_stream(
     env.from_collection([
         (Vectors.dense([1, 2, 3, 4]), 0., 1.),
         (Vectors.dense([2, 2, 3, 4]), 0., 2.),
@@ -145,27 +172,199 @@ binomial_data_table = t_env.from_data_stream(
             [DenseVectorTypeInfo(), Types.DOUBLE(), Types.DOUBLE()])
     ))
 
+# create a logistic regression object and initialize its parameters
 logistic_regression = LogisticRegression().set_weight_col('weight')
-model = logistic_regression.fit(binomial_data_table)
-output = model.transform(binomial_data_table)[0]
-
-output.execute().print()
-
-# output
-# +----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
-# | op |                       features |                          label |                         weight |                     prediction |                  rawPrediction |
-# +----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
-# | +I |           [1.0, 2.0, 3.0, 4.0] |                            0.0 |                            1.0 |                            0.0 | [0.9731815427669942, 0.0268... |
-# | +I |           [5.0, 2.0, 3.0, 4.0] |                            0.0 |                            5.0 |                            0.0 | [0.8158018538556746, 0.1841... |
-# | +I |          [14.0, 2.0, 3.0, 4.0] |                            1.0 |                            4.0 |                            1.0 | [0.03753179912156068, 0.962... |
-# | +I |           [3.0, 2.0, 3.0, 4.0] |                            0.0 |                            3.0 |                            0.0 | [0.926886620226911, 0.07311... |
-# | +I |          [12.0, 2.0, 3.0, 4.0] |                            1.0 |                            2.0 |                            1.0 | [0.10041228069167174, 0.899... |
-# | +I |           [4.0, 2.0, 3.0, 4.0] |                            0.0 |                            4.0 |                            0.0 | [0.8822580948141717, 0.1177... |
-# | +I |          [13.0, 2.0, 3.0, 4.0] |                            1.0 |                            3.0 |                            1.0 | [0.061891528893188164, 0.93... |
-# | +I |           [2.0, 2.0, 3.0, 4.0] |                            0.0 |                            2.0 |                            0.0 | [0.9554533965544176, 0.0445... |
-# | +I |          [11.0, 2.0, 3.0, 4.0] |                            1.0 |                            1.0 |                            1.0 | [0.15884837044317868, 0.841... |
-# | +I |          [15.0, 2.0, 3.0, 4.0] |                            1.0 |                            5.0 |                            1.0 | [0.022529496926532833, 0.97... |
-# +----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
+
+# train the logistic regression model
+model = logistic_regression.fit(input_data)
+
+# use the logistic regression model for predictions
+output = model.transform(input_data)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+    features = result[field_names.index(logistic_regression.get_features_col())]
+    expected_result = result[field_names.index(logistic_regression.get_label_col())]
+    prediction_result = result[field_names.index(logistic_regression.get_prediction_col())]
+    raw_prediction_result = result[field_names.index(logistic_regression.get_raw_prediction_col())]
+    print('Features: ' + str(features) + ' \tExpected Result: ' + str(expected_result)
+          + ' \tPrediction Result: ' + str(prediction_result)
+          + ' \tRaw Prediction Result: ' + str(raw_prediction_result))
+
+```
+{{< /tab>}}
+
+{{< /tabs>}}
+
+## OnlineLogisticRegression
+
+Online Logistic Regression supports training online regression model on an
+unbounded stream of training data. 
+
+The online optimizer of this algorithm is The FTRL-Proximal proposed by
+H.Brendan McMahan et al. See [H. Brendan McMahan et al., Ad click prediction: a
+view from the trenches.](https://doi.org/10.1145/2487575.2488200)
+
+### Input Columns
+
+| Param name  | Type    | Default      | Description      |
+| :---------- | :------ | :----------- | :--------------- |
+| featuresCol | Vector  | `"features"` | Feature vector   |
+| labelCol    | Integer | `"label"`    | Label to predict |
+| weightCol   | Double  | `"weight"`   | Weight of sample |
+
+### Output Columns
+
+| Param name       | Type    | Default           | Description                                            |
+| :--------------- | :------ | :---------------- | :----------------------------------------------------- |
+| predictionCol    | Integer | `"prediction"`    | Label of the max probability                           |
+| rawPredictionCol | Vector  | `"rawPrediction"` | Vector of the probability of each label                |
+| modelVersionCol  | Long    | `"modelVersion"`  | The version of the model data used for this prediction |
+
+### Parameters
+
+Below are the parameters required by `OnlineLogisticRegressionModel`.
+
+| Key              | Default           | Type   | Required | Description                 |
+| ---------------- | ----------------- | ------ | -------- | --------------------------- |
+| featuresCol      | `"features"`      | String | no       | Features column name.       |
+| predictionCol    | `"prediction"`    | String | no       | Prediction column name.     |
+| rawPredictionCol | `"rawPrediction"` | String | no       | Raw prediction column name. |
+| modelVersionCol  | `"modelVersion"`  | String | no       | Model version column name.  |
+
+`OnlineLogisticRegression` needs parameters above and also below.
+
+| Key             | Default          | Type    | Required | Description                                           |
+| --------------- | ---------------- | ------- | -------- | ----------------------------------------------------- |
+| labelCol        | `"label"`        | String  | no       | Label column name.                                    |
+| weightCol       | `null`           | String  | no       | Weight column name.                                   |
+| batchStrategy   | `COUNT_STRATEGY` | String  | no       | Strategy to create mini batch from online train data. |
+| globalBatchSize | `32`             | Integer | no       | Global batch size of training algorithms.             |
+| reg             | `0.`             | Double  | no       | Regularization parameter.                             |
+| elasticNet      | `0.`             | Double  | no       | ElasticNet parameter.                                 |
+
+### Examples
+
+{{< tabs online_examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.classification.logisticregression.OnlineLogisticRegression;
+import org.apache.flink.ml.classification.logisticregression.OnlineLogisticRegressionModel;
+import org.apache.flink.ml.examples.util.PeriodicSourceFunction;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/** Simple program that trains an OnlineLogisticRegression model and uses it for classification. */
+public class OnlineLogisticRegressionExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(4);
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input training and prediction data. Both are infinite streams that periodically
+        // sends out provided data to trigger model update and prediction.
+        List<Row> trainData1 =
+                Arrays.asList(
+                        Row.of(Vectors.dense(0.1, 2.), 0.),
+                        Row.of(Vectors.dense(0.2, 2.), 0.),
+                        Row.of(Vectors.dense(0.3, 2.), 0.),
+                        Row.of(Vectors.dense(0.4, 2.), 0.),
+                        Row.of(Vectors.dense(0.5, 2.), 0.),
+                        Row.of(Vectors.dense(11., 12.), 1.),
+                        Row.of(Vectors.dense(12., 11.), 1.),
+                        Row.of(Vectors.dense(13., 12.), 1.),
+                        Row.of(Vectors.dense(14., 12.), 1.),
+                        Row.of(Vectors.dense(15., 12.), 1.));
+
+        List<Row> trainData2 =
+                Arrays.asList(
+                        Row.of(Vectors.dense(0.2, 3.), 0.),
+                        Row.of(Vectors.dense(0.8, 1.), 0.),
+                        Row.of(Vectors.dense(0.7, 1.), 0.),
+                        Row.of(Vectors.dense(0.6, 2.), 0.),
+                        Row.of(Vectors.dense(0.2, 2.), 0.),
+                        Row.of(Vectors.dense(14., 17.), 1.),
+                        Row.of(Vectors.dense(15., 10.), 1.),
+                        Row.of(Vectors.dense(16., 16.), 1.),
+                        Row.of(Vectors.dense(17., 10.), 1.),
+                        Row.of(Vectors.dense(18., 13.), 1.));
+
+        List<Row> predictData =
+                Arrays.asList(
+                        Row.of(Vectors.dense(0.8, 2.7), 0.0),
+                        Row.of(Vectors.dense(15.5, 11.2), 1.0));
+
+        RowTypeInfo typeInfo =
+                new RowTypeInfo(
+                        new TypeInformation[] {DenseVectorTypeInfo.INSTANCE, Types.DOUBLE},
+                        new String[] {"features", "label"});
+
+        SourceFunction<Row> trainSource =
+                new PeriodicSourceFunction(1000, Arrays.asList(trainData1, trainData2));
+        DataStream<Row> trainStream = env.addSource(trainSource, typeInfo);
+        Table trainTable = tEnv.fromDataStream(trainStream).as("features");
+
+        SourceFunction<Row> predictSource =
+                new PeriodicSourceFunction(1000, Collections.singletonList(predictData));
+        DataStream<Row> predictStream = env.addSource(predictSource, typeInfo);
+        Table predictTable = tEnv.fromDataStream(predictStream).as("features");
+
+        // Creates an online LogisticRegression object and initializes its parameters and initial
+        // model data.
+        Row initModelData = Row.of(Vectors.dense(0.41233679404769874, -0.18088118293232122), 0L);
+        Table initModelDataTable = tEnv.fromDataStream(env.fromElements(initModelData));
+        OnlineLogisticRegression olr =
+                new OnlineLogisticRegression()
+                        .setFeaturesCol("features")
+                        .setLabelCol("label")
+                        .setPredictionCol("prediction")
+                        .setReg(0.2)
+                        .setElasticNet(0.5)
+                        .setGlobalBatchSize(10)
+                        .setInitialModelData(initModelDataTable);
+
+        // Trains the online LogisticRegression Model.
+        OnlineLogisticRegressionModel onlineModel = olr.fit(trainTable);
+
+        // Uses the online LogisticRegression Model for predictions.
+        Table outputTable = onlineModel.transform(predictTable)[0];
+
+        // Extracts and displays the results. As training data stream continuously triggers the
+        // update of the internal model data, raw prediction results of the same predict dataset
+        // would change over time.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+            DenseVector features = (DenseVector) row.getField(olr.getFeaturesCol());
+            Double expectedResult = (Double) row.getField(olr.getLabelCol());
+            Double predictionResult = (Double) row.getField(olr.getPredictionCol());
+            DenseVector rawPredictionResult = (DenseVector) row.getField(olr.getRawPredictionCol());
+            System.out.printf(
+                    "Features: %-25s \tExpected Result: %s \tPrediction Result: %s \tRaw Prediction Result: %s\n",
+                    features, expectedResult, predictionResult, rawPredictionResult);
+        }
+    }
+}
+
 ```
+
 {{< /tab>}}
+
 {{< /tabs>}}
diff --git a/docs/content/docs/operators/classification/naivebayes.md b/docs/content/docs/operators/classification/naivebayes.md
index 6d50c19..df4a6e0 100644
--- a/docs/content/docs/operators/classification/naivebayes.md
+++ b/docs/content/docs/operators/classification/naivebayes.md
@@ -23,25 +23,25 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# Naive Bayes
+## Naive Bayes
 
 Naive Bayes is a multiclass classifier. Based on Bayes’ theorem, it assumes that
 there is strong (naive) independence between every pair of features. 
 
-## Input Columns
+### Input Columns
 
 | Param name  | Type    | Default      | Description      |
 | :---------- | :------ | :----------- | :--------------- |
 | featuresCol | Vector  | `"features"` | Feature vector   |
 | labelCol    | Integer | `"label"`    | Label to predict |
 
-## Output Columns
+### Output Columns
 
 | Param name    | Type    | Default        | Description     |
 | :------------ | :------ | :------------- | :-------------- |
 | predictionCol | Integer | `"prediction"` | Predicted label |
 
-## Parameters
+### Parameters
 
 Below are parameters required by `NaiveBayesModel`.
 
@@ -58,67 +58,98 @@ Below are parameters required by `NaiveBayesModel`.
 | labelCol  | `"label"` | String | no       | Label column name.       |
 | smoothing | `1.0`     | Double | no       | The smoothing parameter. |
 
-## Examples
+### Examples
 
-{{< tabs naivebayes >}}
+{{< tabs examples >}}
 
 {{< tab "Java">}}
 ```java
 import org.apache.flink.ml.classification.naivebayes.NaiveBayes;
 import org.apache.flink.ml.classification.naivebayes.NaiveBayesModel;
+import org.apache.flink.ml.linalg.DenseVector;
 import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a NaiveBayes model and uses it for classification. */
+public class NaiveBayesExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input training and prediction data.
+        DataStream<Row> trainStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(0, 0.), 11),
+                        Row.of(Vectors.dense(1, 0), 10),
+                        Row.of(Vectors.dense(1, 1.), 10));
+        Table trainTable = tEnv.fromDataStream(trainStream).as("features", "label");
+
+        DataStream<Row> predictStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(0, 1.)),
+                        Row.of(Vectors.dense(0, 0.)),
+                        Row.of(Vectors.dense(1, 0)),
+                        Row.of(Vectors.dense(1, 1.)));
+        Table predictTable = tEnv.fromDataStream(predictStream).as("features");
+
+        // Creates a NaiveBayes object and initializes its parameters.
+        NaiveBayes naiveBayes =
+                new NaiveBayes()
+                        .setSmoothing(1.0)
+                        .setFeaturesCol("features")
+                        .setLabelCol("label")
+                        .setPredictionCol("prediction")
+                        .setModelType("multinomial");
+
+        // Trains the NaiveBayes Model.
+        NaiveBayesModel naiveBayesModel = naiveBayes.fit(trainTable);
+
+        // Uses the NaiveBayes Model for predictions.
+        Table outputTable = naiveBayesModel.transform(predictTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+            DenseVector features = (DenseVector) row.getField(naiveBayes.getFeaturesCol());
+            double predictionResult = (Double) row.getField(naiveBayes.getPredictionCol());
+            System.out.printf("Features: %s \tPrediction Result: %s\n", features, predictionResult);
+        }
+    }
+}
 
-List<Row> trainData =
-  Arrays.asList(
-  Row.of(Vectors.dense(0, 0.), 11),
-  Row.of(Vectors.dense(1, 0), 10),
-  Row.of(Vectors.dense(1, 1.), 10));
-
-Table trainTable = tEnv.fromDataStream(env.fromCollection(trainData)).as("features", "label");
-
-List<Row> predictData =
-  Arrays.asList(
-  Row.of(Vectors.dense(0, 1.)),
-  Row.of(Vectors.dense(0, 0.)),
-  Row.of(Vectors.dense(1, 0)),
-  Row.of(Vectors.dense(1, 1.)));
-
-Table predictTable = tEnv.fromDataStream(env.fromCollection(predictData)).as("features");
-
-NaiveBayes estimator =
-  new NaiveBayes()
-  .setSmoothing(1.0)
-  .setFeaturesCol("features")
-  .setLabelCol("label")
-  .setPredictionCol("prediction")
-  .setModelType("multinomial");
-
-NaiveBayesModel model = estimator.fit(trainTable);
-Table outputTable = model.transform(predictTable)[0];
-
-outputTable.execute().print();
 ```
 {{< /tab>}}
 
 
 {{< tab "Python">}}
 ```python
+
+# Simple program that trains a NaiveBayes model and uses it for classification.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to set up Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
 from pyflink.common import Types
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.table import StreamTableEnvironment
-
 from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
 from pyflink.ml.lib.classification.naivebayes import NaiveBayes
+from pyflink.table import StreamTableEnvironment
 
 # create a new StreamExecutionEnvironment
 env = StreamExecutionEnvironment.get_execution_environment()
 
-# load flink ml jar
-env.add_jars("file:///{path}/statefun-flink-core-3.1.0.jar", "file:///{path}/flink-ml-uber-{version}.jar")
-
 # create a StreamTableEnvironment
 t_env = StreamTableEnvironment.create(env)
 
+# generate input training and prediction data
 train_table = t_env.from_data_stream(
     env.from_collection([
         (Vectors.dense([0, 0.]), 11.),
@@ -140,28 +171,28 @@ predict_table = t_env.from_data_stream(
             ['features'],
             [DenseVectorTypeInfo()])))
 
-estimator = (NaiveBayes()
-             .set_smoothing(1.0)
-             .set_features_col('features')
-             .set_label_col('label')
-             .set_prediction_col('prediction')
-             .set_model_type('multinomial'))
+# create a naive bayes object and initialize its parameters
+naive_bayes = NaiveBayes() \
+    .set_smoothing(1.0) \
+    .set_features_col('features') \
+    .set_label_col('label') \
+    .set_prediction_col('prediction') \
+    .set_model_type('multinomial')
 
-model = estimator.fit(train_table)
-output = model.transform(predict_table)[0]
+# train the naive bayes model
+model = naive_bayes.fit(train_table)
 
-output.execute().print()
+# use the naive bayes model for predictions
+output = model.transform(predict_table)[0]
 
-# output
-# +----+--------------------------------+--------------------------------+
-# | op |                       features |                     prediction |
-# +----+--------------------------------+--------------------------------+
-# | +I |                     [0.0, 1.0] |                           11.0 |
-# | +I |                     [0.0, 0.0] |                           11.0 |
-# | +I |                     [1.0, 0.0] |                           10.0 |
-# | +I |                     [1.0, 1.0] |                           10.0 |
-# +----+--------------------------------+--------------------------------+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+    features = result[field_names.index(naive_bayes.get_features_col())]
+    prediction_result = result[field_names.index(naive_bayes.get_prediction_col())]
+    print('Features: ' + str(features) + ' \tPrediction Result: ' + str(prediction_result))
 
 ```
 {{< /tab>}}
+
 {{< /tabs>}}
diff --git a/docs/content/docs/operators/clustering/_index.md b/docs/content/docs/operators/clustering/_index.md
index c7b5b91..86f8a7e 100644
--- a/docs/content/docs/operators/clustering/_index.md
+++ b/docs/content/docs/operators/clustering/_index.md
@@ -1,7 +1,7 @@
 ---
 title: Clustering
 bookCollapseSection: true
-weight: 3
+weight: 1
 aliases:
   - /operators/clustering/
 ---
diff --git a/docs/content/docs/operators/clustering/kmeans.md b/docs/content/docs/operators/clustering/kmeans.md
index 8ad3560..e62b582 100644
--- a/docs/content/docs/operators/clustering/kmeans.md
+++ b/docs/content/docs/operators/clustering/kmeans.md
@@ -23,32 +23,33 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# K-means
+## K-means
 
 K-means is a commonly-used clustering algorithm. It groups given data points
 into a predefined number of clusters.
 
-## Input Columns
+### Input Columns
 
 | Param name  | Type   | Default      | Description    |
 | :---------- | :----- | :----------- | :------------- |
 | featuresCol | Vector | `"features"` | Feature vector |
 
-## Output Columns
+### Output Columns
 
 | Param name    | Type    | Default        | Description              |
 | :------------ | :------ | :------------- | :----------------------- |
 | predictionCol | Integer | `"prediction"` | Predicted cluster center |
 
-## Parameters
+### Parameters
 
-Below are parameters required by `KMeansModel`.
+Below are the parameters required by `KMeansModel`.
 
-| Key             | Default                         | Type   | Required | Description                                                  |
-| --------------- | ------------------------------- | ------ | -------- | ------------------------------------------------------------ |
-| distanceMeasure | `EuclideanDistanceMeasure.NAME` | String | no       | Distance measure. Supported values: `EuclideanDistanceMeasure.NAME` |
-| featuresCol     | `"features"`                    | String | no       | Features column name.                                        |
-| predictionCol   | `"prediction"`                  | String | no       | Prediction column name.                                      |
+| Key             | Default                         | Type    | Required | Description                                                  |
+| --------------- | ------------------------------- | ------- | -------- | ------------------------------------------------------------ |
+| distanceMeasure | `EuclideanDistanceMeasure.NAME` | String  | no       | Distance measure. Supported values: `EuclideanDistanceMeasure.NAME` |
+| featuresCol     | `"features"`                    | String  | no       | Features column name.                                        |
+| predictionCol   | `"prediction"`                  | String  | no       | Prediction column name.                                      |
+| k               | `2`                             | Integer | no       | The max number of clusters to create.                        |
 
 `KMeans` needs parameters above and also below.
 
@@ -58,9 +59,9 @@ Below are parameters required by `KMeansModel`.
 | seed     | `null`     | Long    | no       | The random seed.                                           |
 | maxIter  | `20`       | Integer | no       | Maximum number of iterations.                              |
 
-## Examples
+### Examples
 
-{{< tabs kmeans >}}
+{{< tabs examples >}}
 
 {{< tab "Java">}}
 ```java
@@ -68,58 +69,76 @@ import org.apache.flink.ml.clustering.kmeans.KMeans;
 import org.apache.flink.ml.clustering.kmeans.KMeansModel;
 import org.apache.flink.ml.linalg.DenseVector;
 import org.apache.flink.ml.linalg.Vectors;
-
-// Generates train data and predict data.
-DataStream<DenseVector> inputStream = env.fromElements(
-  Vectors.dense(0.0, 0.0),
-  Vectors.dense(0.0, 0.3),
-  Vectors.dense(0.3, 0.0),
-  Vectors.dense(9.0, 0.0),
-  Vectors.dense(9.0, 0.6),
-  Vectors.dense(9.6, 0.0)
-);
-Table input = tEnv.fromDataStream(inputStream).as("features");
-
-// Creates a K-means object and initialize its parameters.
-KMeans kmeans = new KMeans()
-  .setK(2)
-  .setSeed(1L);
-
-// Trains the K-means Model.
-KMeansModel model = kmeans.fit(input);
-
-// Uses the K-means Model to do predictions.
-Table output = model.transform(input)[0];
-
-// Extracts and displays prediction result.
-for (CloseableIterator<Row> it = output.execute().collect(); it.hasNext(); ) {
-  Row row = it.next();
-  DenseVector vector = (DenseVector) row.getField("features");
-  int clusterId = (Integer) row.getField("prediction");
-  System.out.println("Vector: " + vector + "\tCluster ID: " + clusterId);
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a KMeans model and uses it for clustering. */
+public class KMeansExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<DenseVector> inputStream =
+                env.fromElements(
+                        Vectors.dense(0.0, 0.0),
+                        Vectors.dense(0.0, 0.3),
+                        Vectors.dense(0.3, 0.0),
+                        Vectors.dense(9.0, 0.0),
+                        Vectors.dense(9.0, 0.6),
+                        Vectors.dense(9.6, 0.0));
+        Table inputTable = tEnv.fromDataStream(inputStream).as("features");
+
+        // Creates a K-means object and initializes its parameters.
+        KMeans kmeans = new KMeans().setK(2).setSeed(1L);
+
+        // Trains the K-means Model.
+        KMeansModel kmeansModel = kmeans.fit(inputTable);
+
+        // Uses the K-means Model for predictions.
+        Table outputTable = kmeansModel.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+            DenseVector features = (DenseVector) row.getField(kmeans.getFeaturesCol());
+            int clusterId = (Integer) row.getField(kmeans.getPredictionCol());
+            System.out.printf("Features: %s \tCluster ID: %s\n", features, clusterId);
+        }
+    }
 }
+
 ```
 {{< /tab>}}
 
 {{< tab "Python">}}
 ```python
+# Simple program that trains a KMeans model and uses it for clustering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to set up Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
 from pyflink.common import Types
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.table import StreamTableEnvironment
-
 from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
 from pyflink.ml.lib.clustering.kmeans import KMeans
+from pyflink.table import StreamTableEnvironment
 
 # create a new StreamExecutionEnvironment
 env = StreamExecutionEnvironment.get_execution_environment()
 
-# load flink ml jar
-env.add_jars("file:///{path}/statefun-flink-core-3.1.0.jar", "file:///{path}/flink-ml-uber-{version}.jar")
-
 # create a StreamTableEnvironment
 t_env = StreamTableEnvironment.create(env)
 
-data_table = t_env.from_data_stream(
+# generate input data
+input_data = t_env.from_data_stream(
     env.from_collection([
         (Vectors.dense([0.0, 0.0]),),
         (Vectors.dense([0.0, 0.3]),),
@@ -132,25 +151,185 @@ data_table = t_env.from_data_stream(
             ['features'],
             [DenseVectorTypeInfo()])))
 
+# create a kmeans object and initialize its parameters
 kmeans = KMeans().set_k(2).set_seed(1)
 
-model = kmeans.fit(data_table)
+# train the kmeans model
+model = kmeans.fit(input_data)
+
+# use the kmeans model for predictions
+output = model.transform(input_data)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+    features = result[field_names.index(kmeans.get_features_col())]
+    cluster_id = result[field_names.index(kmeans.get_prediction_col())]
+    print('Features: ' + str(features) + ' \tCluster Id: ' + str(cluster_id))
+
+```
+{{< /tab>}}
+
+{{< /tabs>}}
+
+## Online K-means
+
+Online K-Means extends the function of K-Means, supporting to train a K-Means
+model continuously according to an unbounded stream of train data. 
+
+Online K-Means makes updates with the "mini-batch" K-Means rule, generalized to
+incorporate forgetfulness (i.e. decay). After the centroids estimated on the
+current batch are acquired, Online K-Means computes the new centroids from the
+weighted average between the original and the estimated centroids. The weight of
+the estimated centroids is the number of points assigned to them. The weight of
+the original centroids is also the number of points, but additionally
+multiplying with the decay factor. 
+
+The decay factor scales the contribution of the clusters as estimated thus far.
+If the decay factor is 1, all batches are weighted equally. If the decay factor
+is 0, new centroids are determined entirely by recent data. Lower values
+correspond to more forgetting.
+
+### Input Columns
+
+| Param name  | Type   | Default      | Description    |
+| :---------- | :----- | :----------- | :------------- |
+| featuresCol | Vector | `"features"` | Feature vector |
+
+### Output Columns
+
+| Param name    | Type    | Default        | Description              |
+| :------------ | :------ | :------------- | :----------------------- |
+| predictionCol | Integer | `"prediction"` | Predicted cluster center |
+
+### Parameters
+
+Below are the parameters required by `OnlineKMeansModel`.
+
+| Key             | Default                         | Type    | Required | Description                                                  |
+| --------------- | ------------------------------- | ------- | -------- | ------------------------------------------------------------ |
+| distanceMeasure | `EuclideanDistanceMeasure.NAME` | String  | no       | Distance measure. Supported values: `EuclideanDistanceMeasure.NAME` |
+| featuresCol     | `"features"`                    | String  | no       | Features column name.                                        |
+| predictionCol   | `"prediction"`                  | String  | no       | Prediction column name.                                      |
+| k               | `2`                             | Integer | no       | The max number of clusters to create.                        |
+
+`OnlineKMeans` needs parameters above and also below.
 
-output = model.transform(data_table)[0]
+| Key             | Default          | Type    | Required | Description                                           |
+| --------------- | ---------------- | ------- | -------- | ----------------------------------------------------- |
+| batchStrategy   | `COUNT_STRATEGY` | String  | no       | Strategy to create mini batch from online train data. |
+| globalBatchSize | `32`             | Integer | no       | Global batch size of training algorithms.             |
+| decayFactor     | `0.`             | Double  | no       | The forgetfulness of the previous centroids.          |
+| seed            | null             | Long    | no       | The random seed.                                      |
 
-output.execute().print()
+### Examples
+
+{{< tabs online_examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.clustering.kmeans.KMeansModelData;
+import org.apache.flink.ml.clustering.kmeans.OnlineKMeans;
+import org.apache.flink.ml.clustering.kmeans.OnlineKMeansModel;
+import org.apache.flink.ml.examples.util.PeriodicSourceFunction;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.linalg.typeinfo.DenseVectorTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/** Simple program that trains an OnlineKMeans model and uses it for clustering. */
+public class OnlineKMeansExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(4);
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input training and prediction data. Both are infinite streams that periodically
+        // sends out provided data to trigger model update and prediction.
+        List<Row> trainData1 =
+                Arrays.asList(
+                        Row.of(Vectors.dense(0.0, 0.0)),
+                        Row.of(Vectors.dense(0.0, 0.3)),
+                        Row.of(Vectors.dense(0.3, 0.0)),
+                        Row.of(Vectors.dense(9.0, 0.0)),
+                        Row.of(Vectors.dense(9.0, 0.6)),
+                        Row.of(Vectors.dense(9.6, 0.0)));
+
+        List<Row> trainData2 =
+                Arrays.asList(
+                        Row.of(Vectors.dense(10.0, 100.0)),
+                        Row.of(Vectors.dense(10.0, 100.3)),
+                        Row.of(Vectors.dense(10.3, 100.0)),
+                        Row.of(Vectors.dense(-10.0, -100.0)),
+                        Row.of(Vectors.dense(-10.0, -100.6)),
+                        Row.of(Vectors.dense(-10.6, -100.0)));
+
+        List<Row> predictData =
+                Arrays.asList(
+                        Row.of(Vectors.dense(10.0, 10.0)), Row.of(Vectors.dense(-10.0, 10.0)));
+
+        SourceFunction<Row> trainSource =
+                new PeriodicSourceFunction(1000, Arrays.asList(trainData1, trainData2));
+        DataStream<Row> trainStream =
+                env.addSource(trainSource, new RowTypeInfo(DenseVectorTypeInfo.INSTANCE));
+        Table trainTable = tEnv.fromDataStream(trainStream).as("features");
+
+        SourceFunction<Row> predictSource =
+                new PeriodicSourceFunction(1000, Collections.singletonList(predictData));
+        DataStream<Row> predictStream =
+                env.addSource(predictSource, new RowTypeInfo(DenseVectorTypeInfo.INSTANCE));
+        Table predictTable = tEnv.fromDataStream(predictStream).as("features");
+
+        // Creates an online K-means object and initializes its parameters and initial model data.
+        OnlineKMeans onlineKMeans =
+                new OnlineKMeans()
+                        .setFeaturesCol("features")
+                        .setPredictionCol("prediction")
+                        .setGlobalBatchSize(6)
+                        .setInitialModelData(
+                                KMeansModelData.generateRandomModelData(tEnv, 2, 2, 0.0, 0));
+
+        // Trains the online K-means Model.
+        OnlineKMeansModel onlineModel = onlineKMeans.fit(trainTable);
+
+        // Uses the online K-means Model for predictions.
+        Table outputTable = onlineModel.transform(predictTable)[0];
+
+        // Extracts and displays the results. As training data stream continuously triggers the
+        // update of the internal k-means model data, clustering results of the same predict dataset
+        // would change over time.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row1 = it.next();
+            DenseVector features1 = (DenseVector) row1.getField(onlineKMeans.getFeaturesCol());
+            Integer clusterId1 = (Integer) row1.getField(onlineKMeans.getPredictionCol());
+            Row row2 = it.next();
+            DenseVector features2 = (DenseVector) row2.getField(onlineKMeans.getFeaturesCol());
+            Integer clusterId2 = (Integer) row2.getField(onlineKMeans.getPredictionCol());
+            if (Objects.equals(clusterId1, clusterId2)) {
+                System.out.printf("%s and %s are now in the same cluster.\n", features1, features2);
+            } else {
+                System.out.printf(
+                        "%s and %s are now in different clusters.\n", features1, features2);
+            }
+        }
+    }
+}
 
-# output
-# +----+--------------------------------+-------------+
-# | op |                       features |  prediction |
-# +----+--------------------------------+-------------+
-# | +I |                     [9.0, 0.0] |           1 |
-# | +I |                     [0.0, 0.0] |           0 |
-# | +I |                     [9.0, 0.6] |           1 |
-# | +I |                     [0.3, 3.0] |           0 |
-# | +I |                     [0.0, 0.3] |           0 |
-# | +I |                     [9.6, 0.0] |           1 |
-# +----+--------------------------------+-------------+
 ```
+
 {{< /tab>}}
+
 {{< /tabs>}}
diff --git a/docs/content/docs/operators/clustering/_index.md b/docs/content/docs/operators/evaluation/_index.md
similarity index 93%
copy from docs/content/docs/operators/clustering/_index.md
copy to docs/content/docs/operators/evaluation/_index.md
index c7b5b91..355e557 100644
--- a/docs/content/docs/operators/clustering/_index.md
+++ b/docs/content/docs/operators/evaluation/_index.md
@@ -1,9 +1,9 @@
 ---
-title: Clustering
+title: Evaluation
 bookCollapseSection: true
-weight: 3
+weight: 1
 aliases:
-  - /operators/clustering/
+  - /operators/evaluation/
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/content/docs/operators/evaluation/binaryclassificationevaluator.md b/docs/content/docs/operators/evaluation/binaryclassificationevaluator.md
new file mode 100644
index 0000000..b63f9c1
--- /dev/null
+++ b/docs/content/docs/operators/evaluation/binaryclassificationevaluator.md
@@ -0,0 +1,198 @@
+---
+title: "Binary Classification Evaluator"
+weight: 1
+type: docs
+aliases:
+- /operators/evaluation/binaryclassificationevaluator.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## Binary Classification Evaluator
+
+Binary Classification Evaluator calculates the evaluation metrics for binary
+classification. The input data has `rawPrediction`, `label`, and an optional
+weight column. The `rawPrediction` can be of type double (binary 0/1 prediction,
+or probability of label 1) or of type vector (length-2 vector of raw
+predictions, scores, or label probabilities). The output may contain different
+metrics defined by the parameter `MetricsNames`.
+### Input Columns
+
+| Param name       | Type          | Default         | Description               |
+| :--------------- | :------------ | :-------------- | :------------------------ |
+| labelCol         | Number        | `"label"`       | The label of this entry   |
+| rawPredictionCol | Vector/Number | `rawPrediction` | The raw prediction result |
+| weightCol        | Number        | `null`          | The weight of this entry  |
+
+### Output Columns
+
+| Column name       | Type   | Description                                                  |
+| ----------------- | ------ | ------------------------------------------------------------ |
+| "areaUnderROC"    | Double | the area under the receiver operating characteristic (ROC) curve |
+| "areaUnderPR"     | Double | the area under the precision-recall curve                    |
+| "areaUnderLorenz" | Double | Kolmogorov-Smirnov, measures the ability of the model to separate positive and negative samples |
+| "ks"              | Double | the area under the lorenz curve                              |
+
+### Parameters
+
+| Key              | Default                                                      | Type         | Required | Description                  |
+| ---------------- | ------------------------------------------------------------ | ------------ | -------- | ---------------------------- |
+| labelCol         | `"label"`                                                    | String       | no       | Label column name.           |
+| weightCol        | `null`                                                       | String       | no       | Weight column name.          |
+| rawPredictionCol | `"rawPrediction"`                                            | String       | no       | Raw prediction column name.  |
+| metricsNames     | `[BinaryClassificationEvaluatorParams.AREA_UNDER_ROC, BinaryClassificationEvaluatorParams.AREA_UNDER_PR]` | String Array | no       | Names of the output metrics. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.evaluation.binaryclassification.BinaryClassificationEvaluator;
+import org.apache.flink.ml.evaluation.binaryclassification.BinaryClassificationEvaluatorParams;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+/**
+ * Simple program that creates a BinaryClassificationEvaluator instance and uses it for evaluation.
+ */
+public class BinaryClassificationEvaluatorExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<Row> inputStream =
+                env.fromElements(
+                        Row.of(1.0, Vectors.dense(0.1, 0.9)),
+                        Row.of(1.0, Vectors.dense(0.2, 0.8)),
+                        Row.of(1.0, Vectors.dense(0.3, 0.7)),
+                        Row.of(0.0, Vectors.dense(0.25, 0.75)),
+                        Row.of(0.0, Vectors.dense(0.4, 0.6)),
+                        Row.of(1.0, Vectors.dense(0.35, 0.65)),
+                        Row.of(1.0, Vectors.dense(0.45, 0.55)),
+                        Row.of(0.0, Vectors.dense(0.6, 0.4)),
+                        Row.of(0.0, Vectors.dense(0.7, 0.3)),
+                        Row.of(1.0, Vectors.dense(0.65, 0.35)),
+                        Row.of(0.0, Vectors.dense(0.8, 0.2)),
+                        Row.of(1.0, Vectors.dense(0.9, 0.1)));
+        Table inputTable = tEnv.fromDataStream(inputStream).as("label", "rawPrediction");
+
+        // Creates a BinaryClassificationEvaluator object and initializes its parameters.
+        BinaryClassificationEvaluator evaluator =
+                new BinaryClassificationEvaluator()
+                        .setMetricsNames(
+                                BinaryClassificationEvaluatorParams.AREA_UNDER_PR,
+                                BinaryClassificationEvaluatorParams.KS,
+                                BinaryClassificationEvaluatorParams.AREA_UNDER_ROC);
+
+        // Uses the BinaryClassificationEvaluator object for evaluations.
+        Table outputTable = evaluator.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        Row evaluationResult = outputTable.execute().collect().next();
+        System.out.printf(
+                "Area under the precision-recall curve: %s\n",
+                evaluationResult.getField(BinaryClassificationEvaluatorParams.AREA_UNDER_PR));
+        System.out.printf(
+                "Area under the receiver operating characteristic curve: %s\n",
+                evaluationResult.getField(BinaryClassificationEvaluatorParams.AREA_UNDER_ROC));
+        System.out.printf(
+                "Kolmogorov-Smirnov value: %s\n",
+                evaluationResult.getField(BinaryClassificationEvaluatorParams.KS));
+    }
+}
+
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that creates a BinaryClassificationEvaluator instance and uses
+# it for evaluation.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to set up Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.evaluation.binaryclassificationevaluator import BinaryClassificationEvaluator
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_table = t_env.from_data_stream(
+    env.from_collection([
+        (1.0, Vectors.dense(0.1, 0.9)),
+        (1.0, Vectors.dense(0.2, 0.8)),
+        (1.0, Vectors.dense(0.3, 0.7)),
+        (0.0, Vectors.dense(0.25, 0.75)),
+        (0.0, Vectors.dense(0.4, 0.6)),
+        (1.0, Vectors.dense(0.35, 0.65)),
+        (1.0, Vectors.dense(0.45, 0.55)),
+        (0.0, Vectors.dense(0.6, 0.4)),
+        (0.0, Vectors.dense(0.7, 0.3)),
+        (1.0, Vectors.dense(0.65, 0.35)),
+        (0.0, Vectors.dense(0.8, 0.2)),
+        (1.0, Vectors.dense(0.9, 0.1))
+    ],
+        type_info=Types.ROW_NAMED(
+            ['label', 'rawPrediction'],
+            [Types.DOUBLE(), DenseVectorTypeInfo()]))
+)
+
+# create a binary classification evaluator object and initialize its parameters
+evaluator = BinaryClassificationEvaluator() \
+    .set_metrics_names('areaUnderPR', 'ks', 'areaUnderROC')
+
+# use the binary classification evaluator model for evaluations
+output = evaluator.transform(input_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+result = t_env.to_data_stream(output).execute_and_collect().next()
+print('Area under the precision-recall curve: '
+      + str(result[field_names.index('areaUnderPR')]))
+print('Area under the receiver operating characteristic curve: '
+      + str(result[field_names.index('areaUnderROC')]))
+print('Kolmogorov-Smirnov value: '
+      + str(result[field_names.index('ks')]))
+
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/docs/content/docs/operators/feature/bucketizer.md b/docs/content/docs/operators/feature/bucketizer.md
new file mode 100644
index 0000000..929c6fb
--- /dev/null
+++ b/docs/content/docs/operators/feature/bucketizer.md
@@ -0,0 +1,185 @@
+---
+title: "Bucketizer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/bucketizer.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## Bucketizer
+
+Bucketizer is an algorithm that maps multiple columns of continuous features to
+multiple columns of discrete features, i.e., buckets indices. The indices are in
+[0, numSplitsInThisColumn - 1].
+### Input Columns
+
+| Param name | Type   | Default | Description                          |
+| :--------- | :----- | :------ | :----------------------------------- |
+| inputCols  | Number | `null`  | Continuous features to be bucketized |
+
+### Output Columns
+
+| Param name | Type   | Default | Description                  |
+| :--------- | :----- | :------ | :--------------------------- |
+| outputCols | Double | `null`  | Discrete bucketized features |
+
+### Parameters
+
+| Key           | Default                          | Type        | Required | Description                                                  |
+| ------------- | -------------------------------- | ----------- | -------- | ------------------------------------------------------------ |
+| inputCols     | `null`                           | String      | yes      | Input column names.                                          |
+| outputCols    | `null`                           | String      | yes      | Output column names.                                         |
+| handleInvalid | `HasHandleInvalid.ERROR_INVALID` | String      | No       | Strategy to handle invalid entries.                          |
+| splitsArray   | `null`                           | Double\[][] | yes      | Array of split points for mapping continuous features into buckets. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.common.param.HasHandleInvalid;
+import org.apache.flink.ml.feature.bucketizer.Bucketizer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+
+/** Simple program that creates a Bucketizer instance and uses it for feature engineering. */
+public class BucketizerExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<Row> inputStream = env.fromElements(Row.of(-0.5, 0.0, 1.0, 0.0));
+        Table inputTable = tEnv.fromDataStream(inputStream).as("f1", "f2", "f3", "f4");
+
+        // Creates a Bucketizer object and initializes its parameters.
+        Double[][] splitsArray =
+                new Double[][] {
+                    new Double[] {-0.5, 0.0, 0.5},
+                    new Double[] {-1.0, 0.0, 2.0},
+                    new Double[] {Double.NEGATIVE_INFINITY, 10.0, Double.POSITIVE_INFINITY},
+                    new Double[] {Double.NEGATIVE_INFINITY, 1.5, Double.POSITIVE_INFINITY}
+                };
+        Bucketizer bucketizer =
+                new Bucketizer()
+                        .setInputCols("f1", "f2", "f3", "f4")
+                        .setOutputCols("o1", "o2", "o3", "o4")
+                        .setSplitsArray(splitsArray)
+                        .setHandleInvalid(HasHandleInvalid.SKIP_INVALID);
+
+        // Uses the Bucketizer object for feature transformations.
+        Table outputTable = bucketizer.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+
+            double[] inputValues = new double[bucketizer.getInputCols().length];
+            double[] outputValues = new double[bucketizer.getInputCols().length];
+            for (int i = 0; i < inputValues.length; i++) {
+                inputValues[i] = (double) row.getField(bucketizer.getInputCols()[i]);
+                outputValues[i] = (double) row.getField(bucketizer.getOutputCols()[i]);
+            }
+
+            System.out.printf(
+                    "Input Values: %s\tOutput Values: %s\n",
+                    Arrays.toString(inputValues), Arrays.toString(outputValues));
+        }
+    }
+}
+
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that creates a Bucketizer instance and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to set up Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.lib.feature.bucketizer import Bucketizer
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data = t_env.from_data_stream(
+    env.from_collection([
+        (-0.5, 0.0, 1.0, 0.0),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['f1', 'f2', 'f3', 'f4'],
+            [Types.DOUBLE(), Types.DOUBLE(), Types.DOUBLE(), Types.DOUBLE()])
+    ))
+
+# create a bucketizer object and initialize its parameters
+splits_array = [
+    [-0.5, 0.0, 0.5],
+    [-1.0, 0.0, 2.0],
+    [float('-inf'), 10.0, float('inf')],
+    [float('-inf'), 1.5, float('inf')],
+]
+
+bucketizer = Bucketizer() \
+    .set_input_cols('f1', 'f2', 'f3', 'f4') \
+    .set_output_cols('o1', 'o2', 'o3', 'o4') \
+    .set_splits_array(splits_array)
+
+# use the bucketizer model for feature engineering
+output = bucketizer.transform(input_data)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+input_values = [None for _ in bucketizer.get_input_cols()]
+output_values = [None for _ in bucketizer.get_input_cols()]
+for result in t_env.to_data_stream(output).execute_and_collect():
+    for i in range(len(bucketizer.get_input_cols())):
+        input_values[i] = result[field_names.index(bucketizer.get_input_cols()[i])]
+        output_values[i] = result[field_names.index(bucketizer.get_output_cols()[i])]
+    print('Input Values: ' + str(input_values) + '\tOutput Values: ' + str(output_values))
+
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/docs/content/docs/operators/feature/minmaxscaler.md b/docs/content/docs/operators/feature/minmaxscaler.md
new file mode 100644
index 0000000..d3a5b81
--- /dev/null
+++ b/docs/content/docs/operators/feature/minmaxscaler.md
@@ -0,0 +1,186 @@
+---
+title: "Min Max Scaler"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/minmaxscaler.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## Min Max Scaler
+
+Min Max Scaler is an algorithm that rescales feature values to a common range
+[min, max] which defined by user.
+### Input Columns
+
+| Param name | Type   | Default   | Description           |
+| :--------- | :----- | :-------- | :-------------------- |
+| inputCol   | Vector | `"input"` | features to be scaled |
+
+### Output Columns
+
+| Param name | Type   | Default    | Description     |
+| :--------- | :----- | :--------- | :-------------- |
+| outputCol  | Vector | `"output"` | scaled features |
+
+### Parameters
+
+| Key       | Default    | Type   | Required | Description                              |
+| --------- | ---------- | ------ | -------- | ---------------------------------------- |
+| inputCol  | `"input"`  | String | no       | Input column name.                       |
+| outputCol | `"output"` | String | no       | Output column name.                      |
+| min       | `0.0`      | Double | no       | Lower bound of the output feature range. |
+| max       | `1.0`      | Double | no       | Upper bound of the output feature range. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.feature.minmaxscaler.MinMaxScaler;
+import org.apache.flink.ml.feature.minmaxscaler.MinMaxScalerModel;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a MinMaxScaler model and uses it for feature engineering. */
+public class MinMaxScalerExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input training and prediction data.
+        DataStream<Row> trainStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(0.0, 3.0)),
+                        Row.of(Vectors.dense(2.1, 0.0)),
+                        Row.of(Vectors.dense(4.1, 5.1)),
+                        Row.of(Vectors.dense(6.1, 8.1)),
+                        Row.of(Vectors.dense(200, 400)));
+        Table trainTable = tEnv.fromDataStream(trainStream).as("input");
+
+        DataStream<Row> predictStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(150.0, 90.0)),
+                        Row.of(Vectors.dense(50.0, 40.0)),
+                        Row.of(Vectors.dense(100.0, 50.0)));
+        Table predictTable = tEnv.fromDataStream(predictStream).as("input");
+
+        // Creates a MinMaxScaler object and initializes its parameters.
+        MinMaxScaler minMaxScaler = new MinMaxScaler();
+
+        // Trains the MinMaxScaler Model.
+        MinMaxScalerModel minMaxScalerModel = minMaxScaler.fit(trainTable);
+
+        // Uses the MinMaxScaler Model for predictions.
+        Table outputTable = minMaxScalerModel.transform(predictTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+            DenseVector inputValue = (DenseVector) row.getField(minMaxScaler.getInputCol());
+            DenseVector outputValue = (DenseVector) row.getField(minMaxScaler.getOutputCol());
+            System.out.printf("Input Value: %-15s\tOutput Value: %s\n", inputValue, outputValue);
+        }
+    }
+}
+
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that trains a MinMaxScaler model and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to set up Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.minmaxscaler import MinMaxScaler
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input training and prediction data
+train_data = t_env.from_data_stream(
+    env.from_collection([
+        (Vectors.dense(0.0, 3.0),),
+        (Vectors.dense(2.1, 0.0),),
+        (Vectors.dense(4.1, 5.1),),
+        (Vectors.dense(6.1, 8.1),),
+        (Vectors.dense(200, 400),),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['input'],
+            [DenseVectorTypeInfo()])
+    ))
+
+predict_data = t_env.from_data_stream(
+    env.from_collection([
+        (Vectors.dense(150.0, 90.0),),
+        (Vectors.dense(50.0, 40.0),),
+        (Vectors.dense(100.0, 50.0),),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['input'],
+            [DenseVectorTypeInfo()])
+    ))
+
+# create a min-max-scaler object and initialize its parameters
+min_max_scaler = MinMaxScaler()
+
+# train the min-max-scaler model
+model = min_max_scaler.fit(train_data)
+
+# use the min-max-scaler model for predictions
+output = model.transform(predict_data)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+    input_value = result[field_names.index(min_max_scaler.get_input_col())]
+    output_value = result[field_names.index(min_max_scaler.get_output_col())]
+    print('Input Value: ' + str(input_value) + ' \tOutput Value: ' + str(output_value))
+
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/docs/content/docs/operators/feature/onehotencoder.md b/docs/content/docs/operators/feature/onehotencoder.md
index c0a733e..2c3b8c9 100644
--- a/docs/content/docs/operators/feature/onehotencoder.md
+++ b/docs/content/docs/operators/feature/onehotencoder.md
@@ -24,30 +24,30 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-# One Hot Encoder
+## One Hot Encoder
 
 One-hot encoding maps a categorical feature, represented as a label index, to a
 binary vector with at most a single one-value indicating the presence of a
 specific feature value from among the set of all feature values. This encoding
-allows algorithms which expect continuous features, such as Logistic Regression,
+allows algorithms that expect continuous features, such as Logistic Regression,
 to use categorical features.
 
-OneHotEncoder can transform multiple columns, returning an one-hot-encoded
-output vector column for each input column.
+OneHotEncoder can transform multiple columns, returning a one-hot-encoded output
+vector column for each input column.
 
-## Input Columns
+### Input Columns
 
 | Param name | Type    | Default | Description |
 | :--------- | :------ | :------ | :---------- |
 | inputCols  | Integer | `null`  | Label index |
 
-## Output Columns
+### Output Columns
 
 | Param name | Type   | Default | Description           |
 | :--------- | :----- | :------ | :-------------------- |
 | outputCols | Vector | `null`  | Encoded binary vector |
 
-## Parameters
+### Parameters
 
 | Key           | Default                          | Type    | Required | Description                                                  |
 | ------------- | -------------------------------- | ------- | -------- | ------------------------------------------------------------ |
@@ -56,80 +56,112 @@ output vector column for each input column.
 | handleInvalid | `HasHandleInvalid.ERROR_INVALID` | String  | No       | Strategy to handle invalid entries. Supported values: `HasHandleInvalid.ERROR_INVALID`, `HasHandleInvalid.SKIP_INVALID` |
 | dropLast      | `true`                           | Boolean | no       | Whether to drop the last category.                           |
 
-## Examples
+### Examples
 
-{{< tabs onehotencoder >}}
+{{< tabs examples >}}
 
 {{< tab "Java">}}
 ```java
 import org.apache.flink.ml.feature.onehotencoder.OneHotEncoder;
 import org.apache.flink.ml.feature.onehotencoder.OneHotEncoderModel;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a OneHotEncoder model and uses it for feature engineering. */
+public class OneHotEncoderExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input training and prediction data.
+        DataStream<Row> trainStream =
+                env.fromElements(Row.of(0.0), Row.of(1.0), Row.of(2.0), Row.of(0.0));
+        Table trainTable = tEnv.fromDataStream(trainStream).as("input");
+
+        DataStream<Row> predictStream = env.fromElements(Row.of(0.0), Row.of(1.0), Row.of(2.0));
+        Table predictTable = tEnv.fromDataStream(predictStream).as("input");
+
+        // Creates a OneHotEncoder object and initializes its parameters.
+        OneHotEncoder oneHotEncoder =
+                new OneHotEncoder().setInputCols("input").setOutputCols("output");
+
+        // Trains the OneHotEncoder Model.
+        OneHotEncoderModel model = oneHotEncoder.fit(trainTable);
+
+        // Uses the OneHotEncoder Model for predictions.
+        Table outputTable = model.transform(predictTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+            Double inputValue = (Double) row.getField(oneHotEncoder.getInputCols()[0]);
+            SparseVector outputValue =
+                    (SparseVector) row.getField(oneHotEncoder.getOutputCols()[0]);
+            System.out.printf("Input Value: %s\tOutput Value: %s\n", inputValue, outputValue);
+        }
+    }
+}
 
-List<Row> trainData = Arrays.asList(Row.of(0.0), Row.of(1.0), Row.of(2.0), Row.of(0.0));
-Table trainTable = tEnv.fromDataStream(env.fromCollection(trainData)).as("input");
-
-List<Row> predictData = Arrays.asList(Row.of(0.0), Row.of(1.0), Row.of(2.0));
-Table predictTable = tEnv.fromDataStream(env.fromCollection(predictData)).as("input");
-
-OneHotEncoder estimator = new OneHotEncoder().setInputCols("input").setOutputCols("output");
-OneHotEncoderModel model = estimator.fit(trainTable);
-Table outputTable = model.transform(predictTable)[0];
-
-outputTable.execute().print();
 ```
 {{< /tab>}}
 
 {{< tab "Python">}}
 ```python
-from pyflink.common import Types, Row
+# Simple program that trains a OneHotEncoder model and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to set up Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Row
 from pyflink.datastream import StreamExecutionEnvironment
-from pyflink.table import StreamTableEnvironment, Table, DataTypes
-
-from pyflink.ml.lib.feature.onehotencoder import OneHotEncoder, OneHotEncoderModel
+from pyflink.ml.lib.feature.onehotencoder import OneHotEncoder
+from pyflink.table import StreamTableEnvironment, DataTypes
 
 # create a new StreamExecutionEnvironment
 env = StreamExecutionEnvironment.get_execution_environment()
 
-# load flink ml jar
-env.add_jars("file:///{path}/statefun-flink-core-3.1.0.jar", "file:///{path}/flink-ml-uber-{version}.jar")
-
 # create a StreamTableEnvironment
 t_env = StreamTableEnvironment.create(env)
 
+# generate input training and prediction data
 train_table = t_env.from_elements(
     [Row(0.0), Row(1.0), Row(2.0), Row(0.0)],
     DataTypes.ROW([
-        DataTypes.FIELD("input", DataTypes.DOUBLE())
+        DataTypes.FIELD('input', DataTypes.DOUBLE())
     ]))
 
 predict_table = t_env.from_elements(
     [Row(0.0), Row(1.0), Row(2.0)],
     DataTypes.ROW([
-        DataTypes.FIELD("input", DataTypes.DOUBLE())
+        DataTypes.FIELD('input', DataTypes.DOUBLE())
     ]))
 
-estimator = OneHotEncoder().set_input_cols('input').set_output_cols('output')
-model = estimator.fit(train_table)
-output_table = model.transform(predict_table)[0]
+# create a one-hot-encoder object and initialize its parameters
+one_hot_encoder = OneHotEncoder().set_input_cols('input').set_output_cols('output')
+
+# train the one-hot-encoder model
+model = one_hot_encoder.fit(train_table)
 
-output_table.execute().print()
+# use the one-hot-encoder model for predictions
+output = model.transform(predict_table)[0]
 
-# output
-# +----+--------------------------------+--------------------------------+
-# | op |                          input |                         output |
-# +----+--------------------------------+--------------------------------+
-# | +I |                            0.0 |                (2, [0], [1.0]) |
-# | +I |                            1.0 |                (2, [1], [1.0]) |
-# | +I |                            2.0 |                    (2, [], []) |
-# +----+--------------------------------+--------------------------------+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+    input_value = result[field_names.index(one_hot_encoder.get_input_cols()[0])]
+    output_value = result[field_names.index(one_hot_encoder.get_output_cols()[0])]
+    print('Input Value: ' + str(input_value) + ' \tOutput Value: ' + str(output_value))
 
 ```
 {{< /tab>}}
-{{< /tabs>}}
-
-
-
-
-
-
 
+{{< /tabs>}}
diff --git a/docs/content/docs/operators/feature/standardscaler.md b/docs/content/docs/operators/feature/standardscaler.md
new file mode 100644
index 0000000..a70344f
--- /dev/null
+++ b/docs/content/docs/operators/feature/standardscaler.md
@@ -0,0 +1,164 @@
+---
+title: "Standard Scaler"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/standardscaler.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## Standard Scaler
+
+Standard Scaler is an algorithm that standardizes the input features by removing
+the mean and scaling each dimension to unit variance.
+### Input Columns
+
+| Param name | Type   | Default   | Description           |
+| :--------- | :----- | :-------- | :-------------------- |
+| inputCol   | Vector | `"input"` | features to be scaled |
+
+### Output Columns
+
+| Param name | Type   | Default    | Description     |
+| :--------- | :----- | :--------- | :-------------- |
+| outputCol  | Vector | `"output"` | scaled features |
+
+### Parameters
+
+| Key       | Default    | Type    | Required | Description                                        |
+| --------- | ---------- | ------- | -------- | -------------------------------------------------- |
+| inputCol  | `"input"`  | String  | no       | Input column name.                                 |
+| outputCol | `"output"` | String  | no       | Output column name.                                |
+| withMean  | `false`    | Boolean | no       | Whether centers the data with mean before scaling. |
+| withStd   | `true`     | Boolean | no       | Whether scales the data with standard deviation.   |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.feature.standardscaler.StandardScaler;
+import org.apache.flink.ml.feature.standardscaler.StandardScalerModel;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a StandardScaler model and uses it for feature engineering. */
+public class StandardScalerExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<Row> inputStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(-2.5, 9, 1)),
+                        Row.of(Vectors.dense(1.4, -5, 1)),
+                        Row.of(Vectors.dense(2, -1, -2)));
+        Table inputTable = tEnv.fromDataStream(inputStream).as("input");
+
+        // Creates a StandardScaler object and initializes its parameters.
+        StandardScaler standardScaler = new StandardScaler();
+
+        // Trains the StandardScaler Model.
+        StandardScalerModel model = standardScaler.fit(inputTable);
+
+        // Uses the StandardScaler Model for predictions.
+        Table outputTable = model.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+            DenseVector inputValue = (DenseVector) row.getField(standardScaler.getInputCol());
+            DenseVector outputValue = (DenseVector) row.getField(standardScaler.getOutputCol());
+            System.out.printf("Input Value: %s\tOutput Value: %s\n", inputValue, outputValue);
+        }
+    }
+}
+
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that trains a StandardScaler model and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to set up Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.standardscaler import StandardScaler
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data = t_env.from_data_stream(
+    env.from_collection([
+        (Vectors.dense(-2.5, 9, 1),),
+        (Vectors.dense(1.4, -5, 1),),
+        (Vectors.dense(2, -1, -2),),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['input'],
+            [DenseVectorTypeInfo()])
+    ))
+
+# create a standard-scaler object and initialize its parameters
+standard_scaler = StandardScaler()
+
+# train the standard-scaler model
+model = standard_scaler.fit(input_data)
+
+# use the standard-scaler model for predictions
+output = model.transform(input_data)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+    input_value = result[field_names.index(standard_scaler.get_input_col())]
+    output_value = result[field_names.index(standard_scaler.get_output_col())]
+    print('Input Value: ' + str(input_value) + ' \tOutput Value: ' + str(output_value))
+
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/docs/content/docs/operators/feature/stringindexer.md b/docs/content/docs/operators/feature/stringindexer.md
new file mode 100644
index 0000000..68a0aeb
--- /dev/null
+++ b/docs/content/docs/operators/feature/stringindexer.md
@@ -0,0 +1,388 @@
+---
+title: "String Indexer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/stringindexer.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## String Indexer
+
+String Indexer maps one or more columns (string/numerical value) of the input to
+one or more indexed output columns (integer value). The output indices of two
+data points are the same iff their corresponding input columns are the same. The
+indices are in [0, numDistinctValuesInThisColumn].
+
+IndexToStringModel transforms input index column(s) to string column(s) using
+the model data computed by StringIndexer. It is a reverse operation of
+StringIndexerModel.
+### Input Columns
+
+| Param name | Type          | Default | Description                            |
+| :--------- | :------------ | :------ | :------------------------------------- |
+| inputCols  | Number/String | `null`  | string/numerical values to be indexed. |
+
+### Output Columns
+
+| Param name | Type   | Default | Description                         |
+| :--------- | :----- | :------ | :---------------------------------- |
+| outputCols | Double | `null`  | Indices of string/numerical values. |
+
+### Parameters
+
+Below are the parameters required by `StringIndexerModel`.
+
+| Key           | Default                          | Type   | Required | Description                         |
+| ------------- | -------------------------------- | ------ | -------- | ----------------------------------- |
+| inputCols     | `null`                           | String | yes      | Input column names.                 |
+| outputCols    | `null`                           | String | yes      | Output column names.                |
+| handleInvalid | `HasHandleInvalid.ERROR_INVALID` | String | No       | Strategy to handle invalid entries. |
+
+`StringIndexer` needs parameters above and also below.
+
+| Key             | Default                               | Type   | Required | Description                          |
+| --------------- | ------------------------------------- | ------ | -------- | ------------------------------------ |
+| stringOrderType | `StringIndexerParams.ARBITRARY_ORDER` | String | no       | How to order strings of each column. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.feature.stringindexer.StringIndexer;
+import org.apache.flink.ml.feature.stringindexer.StringIndexerModel;
+import org.apache.flink.ml.feature.stringindexer.StringIndexerParams;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+
+/** Simple program that trains a StringIndexer model and uses it for feature engineering. */
+public class StringIndexerExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input training and prediction data.
+        DataStream<Row> trainStream =
+                env.fromElements(
+                        Row.of("a", 1.0),
+                        Row.of("b", 1.0),
+                        Row.of("b", 2.0),
+                        Row.of("c", 0.0),
+                        Row.of("d", 2.0),
+                        Row.of("a", 2.0),
+                        Row.of("b", 2.0),
+                        Row.of("b", -1.0),
+                        Row.of("a", -1.0),
+                        Row.of("c", -1.0));
+        Table trainTable = tEnv.fromDataStream(trainStream).as("inputCol1", "inputCol2");
+
+        DataStream<Row> predictStream =
+                env.fromElements(Row.of("a", 2.0), Row.of("b", 1.0), Row.of("c", 2.0));
+        Table predictTable = tEnv.fromDataStream(predictStream).as("inputCol1", "inputCol2");
+
+        // Creates a StringIndexer object and initializes its parameters.
+        StringIndexer stringIndexer =
+                new StringIndexer()
+                        .setStringOrderType(StringIndexerParams.ALPHABET_ASC_ORDER)
+                        .setInputCols("inputCol1", "inputCol2")
+                        .setOutputCols("outputCol1", "outputCol2");
+
+        // Trains the StringIndexer Model.
+        StringIndexerModel model = stringIndexer.fit(trainTable);
+
+        // Uses the StringIndexer Model for predictions.
+        Table outputTable = model.transform(predictTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+
+            Object[] inputValues = new Object[stringIndexer.getInputCols().length];
+            double[] outputValues = new double[stringIndexer.getInputCols().length];
+            for (int i = 0; i < inputValues.length; i++) {
+                inputValues[i] = row.getField(stringIndexer.getInputCols()[i]);
+                outputValues[i] = (double) row.getField(stringIndexer.getOutputCols()[i]);
+            }
+
+            System.out.printf(
+                    "Input Values: %s \tOutput Values: %s\n",
+                    Arrays.toString(inputValues), Arrays.toString(outputValues));
+        }
+    }
+}
+
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that trains a StringIndexer model and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to set up Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.lib.feature.stringindexer import StringIndexer
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input training and prediction data
+train_table = t_env.from_data_stream(
+    env.from_collection([
+        ('a', 1.),
+        ('b', 1.),
+        ('b', 2.),
+        ('c', 0.),
+        ('d', 2.),
+        ('a', 2.),
+        ('b', 2.),
+        ('b', -1.),
+        ('a', -1.),
+        ('c', -1.),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['input_col1', 'input_col2'],
+            [Types.STRING(), Types.DOUBLE()])
+    ))
+
+predict_table = t_env.from_data_stream(
+    env.from_collection([
+        ('a', 2.),
+        ('b', 1.),
+        ('c', 2.),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['input_col1', 'input_col2'],
+            [Types.STRING(), Types.DOUBLE()])
+    ))
+
+# create a string-indexer object and initialize its parameters
+string_indexer = StringIndexer() \
+    .set_string_order_type('alphabetAsc') \
+    .set_input_cols('input_col1', 'input_col2') \
+    .set_output_cols('output_col1', 'output_col2')
+
+# train the string-indexer model
+model = string_indexer.fit(train_table)
+
+# use the string-indexer model for feature engineering
+output = model.transform(predict_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+input_values = [None for _ in string_indexer.get_input_cols()]
+output_values = [None for _ in string_indexer.get_input_cols()]
+for result in t_env.to_data_stream(output).execute_and_collect():
+    for i in range(len(string_indexer.get_input_cols())):
+        input_values[i] = result[field_names.index(string_indexer.get_input_cols()[i])]
+        output_values[i] = result[field_names.index(string_indexer.get_output_cols()[i])]
+    print('Input Values: ' + str(input_values) + '\tOutput Values: ' + str(output_values))
+
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
+
+## Index To String
+
+`IndexToStringModel` transforms input index column(s) to string column(s) using
+the model data computed by StringIndexer. It is a reverse operation of
+StringIndexerModel.
+
+### Input Columns
+
+| Param name | Type    | Default | Description                          |
+| :--------- | :------ | :------ | :----------------------------------- |
+| inputCols  | Integer | `null`  | Indices to be transformed to string. |
+
+### Output Columns
+
+| Param name | Type   | Default | Description          |
+| :--------- | :----- | :------ | :------------------- |
+| outputCols | String | `null`  | Transformed strings. |
+
+### Parameters
+
+Below are the parameters required by `StringIndexerModel`.
+
+| Key        | Default | Type   | Required | Description          |
+| ---------- | ------- | ------ | -------- | -------------------- |
+| inputCols  | `null`  | String | yes      | Input column names.  |
+| outputCols | `null`  | String | yes      | Output column names. |
+
+### Examples
+
+{{< tabs index_to_string_examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.feature.stringindexer.IndexToStringModel;
+import org.apache.flink.ml.feature.stringindexer.StringIndexerModelData;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+
+/**
+ * Simple program that creates an IndexToStringModelExample instance and uses it for feature
+ * engineering.
+ */
+public class IndexToStringModelExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Creates model data for IndexToStringModel.
+        StringIndexerModelData modelData =
+                new StringIndexerModelData(
+                        new String[][] {{"a", "b", "c", "d"}, {"-1.0", "0.0", "1.0", "2.0"}});
+        Table modelTable = tEnv.fromDataStream(env.fromElements(modelData)).as("stringArrays");
+
+        // Generates input data.
+        DataStream<Row> predictStream = env.fromElements(Row.of(0, 3), Row.of(1, 2));
+        Table predictTable = tEnv.fromDataStream(predictStream).as("inputCol1", "inputCol2");
+
+        // Creates an indexToStringModel object and initializes its parameters.
+        IndexToStringModel indexToStringModel =
+                new IndexToStringModel()
+                        .setInputCols("inputCol1", "inputCol2")
+                        .setOutputCols("outputCol1", "outputCol2")
+                        .setModelData(modelTable);
+
+        // Uses the indexToStringModel object for feature transformations.
+        Table outputTable = indexToStringModel.transform(predictTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+
+            int[] inputValues = new int[indexToStringModel.getInputCols().length];
+            String[] outputValues = new String[indexToStringModel.getInputCols().length];
+            for (int i = 0; i < inputValues.length; i++) {
+                inputValues[i] = (int) row.getField(indexToStringModel.getInputCols()[i]);
+                outputValues[i] = (String) row.getField(indexToStringModel.getOutputCols()[i]);
+            }
+
+            System.out.printf(
+                    "Input Values: %s \tOutput Values: %s\n",
+                    Arrays.toString(inputValues), Arrays.toString(outputValues));
+        }
+    }
+}
+
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that creates an IndexToStringModelExample instance and uses it
+# for feature engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to set up Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.lib.feature.stringindexer import IndexToStringModel
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+predict_table = t_env.from_data_stream(
+    env.from_collection([
+        (0, 3),
+        (1, 2),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['input_col1', 'input_col2'],
+            [Types.INT(), Types.INT()])
+    ))
+
+# create an index-to-string model and initialize its parameters and model data
+model_data_table = t_env.from_data_stream(
+    env.from_collection([
+        ([['a', 'b', 'c', 'd'], [-1., 0., 1., 2.]],),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['stringArrays'],
+            [Types.OBJECT_ARRAY(Types.OBJECT_ARRAY(Types.STRING()))])
+    ))
+
+model = IndexToStringModel() \
+    .set_input_cols('input_col1', 'input_col2') \
+    .set_output_cols('output_col1', 'output_col2') \
+    .set_model_data(model_data_table)
+
+# use the index-to-string model for feature engineering
+output = model.transform(predict_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+input_values = [None for _ in model.get_input_cols()]
+output_values = [None for _ in model.get_input_cols()]
+for result in t_env.to_data_stream(output).execute_and_collect():
+    for i in range(len(model.get_input_cols())):
+        input_values[i] = result[field_names.index(model.get_input_cols()[i])]
+        output_values[i] = result[field_names.index(model.get_output_cols()[i])]
+    print('Input Values: ' + str(input_values) + '\tOutput Values: ' + str(output_values))
+
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/docs/content/docs/operators/feature/vectorassembler.md b/docs/content/docs/operators/feature/vectorassembler.md
new file mode 100644
index 0000000..ab13213
--- /dev/null
+++ b/docs/content/docs/operators/feature/vectorassembler.md
@@ -0,0 +1,186 @@
+---
+title: "Vector Assembler"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/vectorassembler.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## Vector Assembler
+
+Vector Assembler combines a given list of input columns into a vector column.
+Types of input columns must be either vector or numerical value.
+
+### Input Columns
+
+| Param name | Type          | Default | Description                     |
+| :--------- | :------------ | :------ | :------------------------------ |
+| inputCols  | Number/Vector | `null`  | Number/Vectors to be assembled. |
+
+### Output Columns
+
+| Param name | Type   | Default    | Description       |
+| :--------- | :----- | :--------- | :---------------- |
+| outputCol  | Vector | `"output"` | Assembled vector. |
+
+### Parameters
+
+| Key           | Default                          | Type   | Required | Description                         |
+| ------------- | -------------------------------- | ------ | -------- | ----------------------------------- |
+| inputCols     | `null`                           | String | yes      | Input column names.                 |
+| outputCol     | `"output"`                       | String | No       | Output column name.                 |
+| handleInvalid | `HasHandleInvalid.ERROR_INVALID` | String | No       | Strategy to handle invalid entries. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.feature.vectorassembler.VectorAssembler;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+
+/** Simple program that creates a VectorAssembler instance and uses it for feature engineering. */
+public class VectorAssemblerExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<Row> inputStream =
+                env.fromElements(
+                        Row.of(
+                                Vectors.dense(2.1, 3.1),
+                                1.0,
+                                Vectors.sparse(5, new int[] {3}, new double[] {1.0})),
+                        Row.of(
+                                Vectors.dense(2.1, 3.1),
+                                1.0,
+                                Vectors.sparse(
+                                        5,
+                                        new int[] {4, 2, 3, 1},
+                                        new double[] {4.0, 2.0, 3.0, 1.0})));
+        Table inputTable = tEnv.fromDataStream(inputStream).as("vec", "num", "sparseVec");
+
+        // Creates a VectorAssembler object and initializes its parameters.
+        VectorAssembler vectorAssembler =
+                new VectorAssembler()
+                        .setInputCols("vec", "num", "sparseVec")
+                        .setOutputCol("assembledVec");
+
+        // Uses the VectorAssembler object for feature transformations.
+        Table outputTable = vectorAssembler.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+
+            Object[] inputValues = new Object[vectorAssembler.getInputCols().length];
+            for (int i = 0; i < inputValues.length; i++) {
+                inputValues[i] = row.getField(vectorAssembler.getInputCols()[i]);
+            }
+
+            Vector outputValue = (Vector) row.getField(vectorAssembler.getOutputCol());
+
+            System.out.printf(
+                    "Input Values: %s \tOutput Value: %s\n",
+                    Arrays.toString(inputValues), outputValue);
+        }
+    }
+}
+
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that creates a VectorAssembler instance and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to set up Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, SparseVectorTypeInfo
+from pyflink.ml.lib.feature.vectorassembler import VectorAssembler
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data_table = t_env.from_data_stream(
+    env.from_collection([
+        (Vectors.dense(2.1, 3.1),
+         1.0,
+         Vectors.sparse(5, [3], [1.0])),
+        (Vectors.dense(2.1, 3.1),
+         1.0,
+         Vectors.sparse(5, [1, 2, 3, 4],
+                        [1.0, 2.0, 3.0, 4.0])),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['vec', 'num', 'sparse_vec'],
+            [DenseVectorTypeInfo(), Types.DOUBLE(), SparseVectorTypeInfo()])))
+
+# create a vector assembler object and initialize its parameters
+vector_assembler = VectorAssembler() \
+    .set_input_cols('vec', 'num', 'sparse_vec') \
+    .set_output_col('assembled_vec') \
+    .set_handle_invalid('keep')
+
+# use the vector assembler model for feature engineering
+output = vector_assembler.transform(input_data_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+input_values = [None for _ in vector_assembler.get_input_cols()]
+for result in t_env.to_data_stream(output).execute_and_collect():
+    for i in range(len(vector_assembler.get_input_cols())):
+        input_values[i] = result[field_names.index(vector_assembler.get_input_cols()[i])]
+    output_value = result[field_names.index(vector_assembler.get_output_col())]
+    print('Input Values: ' + str(input_values) + '\tOutput Value: ' + str(output_value))
+
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/docs/content/docs/operators/clustering/_index.md b/docs/content/docs/operators/regression/_index.md
similarity index 93%
copy from docs/content/docs/operators/clustering/_index.md
copy to docs/content/docs/operators/regression/_index.md
index c7b5b91..f718afe 100644
--- a/docs/content/docs/operators/clustering/_index.md
+++ b/docs/content/docs/operators/regression/_index.md
@@ -1,9 +1,9 @@
 ---
-title: Clustering
+title: Regression
 bookCollapseSection: true
-weight: 3
+weight: 1
 aliases:
-  - /operators/clustering/
+  - /operators/regression/
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/content/docs/operators/regression/linearregression.md b/docs/content/docs/operators/regression/linearregression.md
new file mode 100644
index 0000000..0b2f418
--- /dev/null
+++ b/docs/content/docs/operators/regression/linearregression.md
@@ -0,0 +1,194 @@
+---
+title: "Linear Regression"
+type: docs
+aliases:
+- /operators/regression/linearregression.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## Linear Regression
+
+Linear Regression is a kind of regression analysis by modeling the relationship
+between a scalar response and one or more explanatory variables.
+
+### Input Columns
+
+| Param name  | Type    | Default      | Description      |
+| :---------- | :------ | :----------- | :--------------- |
+| featuresCol | Vector  | `"features"` | Feature vector   |
+| labelCol    | Integer | `"label"`    | Label to predict |
+| weightCol   | Double  | `"weight"`   | Weight of sample |
+
+### Output Columns
+
+| Param name    | Type    | Default        | Description                  |
+| :------------ | :------ | :------------- | :--------------------------- |
+| predictionCol | Integer | `"prediction"` | Label of the max probability |
+
+### Parameters
+
+Below are the parameters required by `LinearRegressionModel`.
+
+| Key           | Default        | Type   | Required | Description             |
+| ------------- | -------------- | ------ | -------- | ----------------------- |
+| featuresCol   | `"features"`   | String | no       | Features column name.   |
+| predictionCol | `"prediction"` | String | no       | Prediction column name. |
+
+`LinearRegression` needs parameters above and also below.
+
+| Key             | Default   | Type    | Required | Description                                     |
+| --------------- | --------- | ------- | -------- | ----------------------------------------------- |
+| labelCol        | `"label"` | String  | no       | Label column name.                              |
+| weightCol       | `null`    | String  | no       | Weight column name.                             |
+| maxIter         | `20`      | Integer | no       | Maximum number of iterations.                   |
+| reg             | `0.`      | Double  | no       | Regularization parameter.                       |
+| elasticNet      | `0.`      | Double  | no       | ElasticNet parameter.                           |
+| learningRate    | `0.1`     | Double  | no       | Learning rate of optimization method.           |
+| globalBatchSize | `32`      | Integer | no       | Global batch size of training algorithms.       |
+| tol             | `1e-6`    | Double  | no       | Convergence tolerance for iterative algorithms. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.regression.linearregression.LinearRegression;
+import org.apache.flink.ml.regression.linearregression.LinearRegressionModel;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a LinearRegression model and uses it for regression. */
+public class LinearRegressionExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<Row> inputStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(2, 1), 4.0, 1.0),
+                        Row.of(Vectors.dense(3, 2), 7.0, 1.0),
+                        Row.of(Vectors.dense(4, 3), 10.0, 1.0),
+                        Row.of(Vectors.dense(2, 4), 10.0, 1.0),
+                        Row.of(Vectors.dense(2, 2), 6.0, 1.0),
+                        Row.of(Vectors.dense(4, 3), 10.0, 1.0),
+                        Row.of(Vectors.dense(1, 2), 5.0, 1.0),
+                        Row.of(Vectors.dense(5, 3), 11.0, 1.0));
+        Table inputTable = tEnv.fromDataStream(inputStream).as("features", "label", "weight");
+
+        // Creates a LinearRegression object and initializes its parameters.
+        LinearRegression lr = new LinearRegression().setWeightCol("weight");
+
+        // Trains the LinearRegression Model.
+        LinearRegressionModel lrModel = lr.fit(inputTable);
+
+        // Uses the LinearRegression Model for predictions.
+        Table outputTable = lrModel.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+            DenseVector features = (DenseVector) row.getField(lr.getFeaturesCol());
+            double expectedResult = (Double) row.getField(lr.getLabelCol());
+            double predictionResult = (Double) row.getField(lr.getPredictionCol());
+            System.out.printf(
+                    "Features: %s \tExpected Result: %s \tPrediction Result: %s\n",
+                    features, expectedResult, predictionResult);
+        }
+    }
+}
+
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that trains a LinearRegression model and uses it for
+# regression.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to set up Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.regression.linearregression import LinearRegression
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_table = t_env.from_data_stream(
+    env.from_collection([
+        (Vectors.dense(2, 1), 4., 1.),
+        (Vectors.dense(3, 2), 7., 1.),
+        (Vectors.dense(4, 3), 10., 1.),
+        (Vectors.dense(2, 4), 10., 1.),
+        (Vectors.dense(2, 2), 6., 1.),
+        (Vectors.dense(4, 3), 10., 1.),
+        (Vectors.dense(1, 2), 5., 1.),
+        (Vectors.dense(5, 3), 11., 1.),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['features', 'label', 'weight'],
+            [DenseVectorTypeInfo(), Types.DOUBLE(), Types.DOUBLE()])
+    ))
+
+# create a linear regression object and initialize its parameters
+linear_regression = LinearRegression().set_weight_col('weight')
+
+# train the linear regression model
+model = linear_regression.fit(input_table)
+
+# use the linear regression model for predictions
+output = model.transform(input_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+    features = result[field_names.index(linear_regression.get_features_col())]
+    expected_result = result[field_names.index(linear_regression.get_label_col())]
+    prediction_result = result[field_names.index(linear_regression.get_prediction_col())]
+    print('Features: ' + str(features) + ' \tExpected Result: ' + str(expected_result)
+          + ' \tPrediction Result: ' + str(prediction_result))
+
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/IndexToStringModelExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/IndexToStringModelExample.java
index 2b035d1..9836333 100644
--- a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/IndexToStringModelExample.java
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/IndexToStringModelExample.java
@@ -30,7 +30,7 @@ import org.apache.flink.util.CloseableIterator;
 import java.util.Arrays;
 
 /**
- * Simple program that creates a IndexToStringModelExample instance and uses it for feature
+ * Simple program that creates an IndexToStringModelExample instance and uses it for feature
  * engineering.
  */
 public class IndexToStringModelExample {
diff --git a/flink-ml-python/pyflink/examples/ml/classification/knn_example.py b/flink-ml-python/pyflink/examples/ml/classification/knn_example.py
index 1c4b0fb..2d6c5a3 100644
--- a/flink-ml-python/pyflink/examples/ml/classification/knn_example.py
+++ b/flink-ml-python/pyflink/examples/ml/classification/knn_example.py
@@ -19,7 +19,7 @@
 # Simple program that trains a Knn model and uses it for classification.
 #
 # Before executing this program, please make sure you have followed Flink ML's
-# quick start guideline to setup Flink ML and Flink environment. The guideline
+# quick start guideline to set up Flink ML and Flink environment. The guideline
 # can be found at
 #
 # https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
diff --git a/flink-ml-python/pyflink/examples/ml/classification/linearsvc_example.py b/flink-ml-python/pyflink/examples/ml/classification/linearsvc_example.py
index eb19aef..d48a0fd 100644
--- a/flink-ml-python/pyflink/examples/ml/classification/linearsvc_example.py
+++ b/flink-ml-python/pyflink/examples/ml/classification/linearsvc_example.py
@@ -19,7 +19,7 @@
 # Simple program that trains a LinearSVC model and uses it for classification.
 #
 # Before executing this program, please make sure you have followed Flink ML's
-# quick start guideline to setup Flink ML and Flink environment. The guideline
+# quick start guideline to set up Flink ML and Flink environment. The guideline
 # can be found at
 #
 # https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
diff --git a/flink-ml-python/pyflink/examples/ml/classification/logisticregression_example.py b/flink-ml-python/pyflink/examples/ml/classification/logisticregression_example.py
index 1b31971..45a54e3 100644
--- a/flink-ml-python/pyflink/examples/ml/classification/logisticregression_example.py
+++ b/flink-ml-python/pyflink/examples/ml/classification/logisticregression_example.py
@@ -20,7 +20,7 @@
 # classification.
 #
 # Before executing this program, please make sure you have followed Flink ML's
-# quick start guideline to setup Flink ML and Flink environment. The guideline
+# quick start guideline to set up Flink ML and Flink environment. The guideline
 # can be found at
 #
 # https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
diff --git a/flink-ml-python/pyflink/examples/ml/classification/naivebayes_example.py b/flink-ml-python/pyflink/examples/ml/classification/naivebayes_example.py
index 20b940d..ebf9b80 100644
--- a/flink-ml-python/pyflink/examples/ml/classification/naivebayes_example.py
+++ b/flink-ml-python/pyflink/examples/ml/classification/naivebayes_example.py
@@ -19,7 +19,7 @@
 # Simple program that trains a NaiveBayes model and uses it for classification.
 #
 # Before executing this program, please make sure you have followed Flink ML's
-# quick start guideline to setup Flink ML and Flink environment. The guideline
+# quick start guideline to set up Flink ML and Flink environment. The guideline
 # can be found at
 #
 # https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
diff --git a/flink-ml-python/pyflink/examples/ml/clustering/kmeans_example.py b/flink-ml-python/pyflink/examples/ml/clustering/kmeans_example.py
index 72c5d10..857863d 100644
--- a/flink-ml-python/pyflink/examples/ml/clustering/kmeans_example.py
+++ b/flink-ml-python/pyflink/examples/ml/clustering/kmeans_example.py
@@ -19,7 +19,7 @@
 # Simple program that trains a KMeans model and uses it for clustering.
 #
 # Before executing this program, please make sure you have followed Flink ML's
-# quick start guideline to setup Flink ML and Flink environment. The guideline
+# quick start guideline to set up Flink ML and Flink environment. The guideline
 # can be found at
 #
 # https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
diff --git a/flink-ml-python/pyflink/examples/ml/evaluation/binaryclassificationevaluator_example.py b/flink-ml-python/pyflink/examples/ml/evaluation/binaryclassificationevaluator_example.py
index 72ca449..99ebf53 100644
--- a/flink-ml-python/pyflink/examples/ml/evaluation/binaryclassificationevaluator_example.py
+++ b/flink-ml-python/pyflink/examples/ml/evaluation/binaryclassificationevaluator_example.py
@@ -20,7 +20,7 @@
 # it for evaluation.
 #
 # Before executing this program, please make sure you have followed Flink ML's
-# quick start guideline to setup Flink ML and Flink environment. The guideline
+# quick start guideline to set up Flink ML and Flink environment. The guideline
 # can be found at
 #
 # https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
diff --git a/flink-ml-python/pyflink/examples/ml/feature/bucketizer_example.py b/flink-ml-python/pyflink/examples/ml/feature/bucketizer_example.py
index c0d3b81..f475b23 100644
--- a/flink-ml-python/pyflink/examples/ml/feature/bucketizer_example.py
+++ b/flink-ml-python/pyflink/examples/ml/feature/bucketizer_example.py
@@ -20,7 +20,7 @@
 # engineering.
 #
 # Before executing this program, please make sure you have followed Flink ML's
-# quick start guideline to setup Flink ML and Flink environment. The guideline
+# quick start guideline to set up Flink ML and Flink environment. The guideline
 # can be found at
 #
 # https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
diff --git a/flink-ml-python/pyflink/examples/ml/feature/indextostringmodel_example.py b/flink-ml-python/pyflink/examples/ml/feature/indextostringmodel_example.py
index 72f8989..9351d71 100644
--- a/flink-ml-python/pyflink/examples/ml/feature/indextostringmodel_example.py
+++ b/flink-ml-python/pyflink/examples/ml/feature/indextostringmodel_example.py
@@ -16,11 +16,11 @@
 # limitations under the License.
 ################################################################################
 
-# Simple program that creates a IndexToStringModelExample instance and uses it
+# Simple program that creates an IndexToStringModelExample instance and uses it
 # for feature engineering.
 #
 # Before executing this program, please make sure you have followed Flink ML's
-# quick start guideline to setup Flink ML and Flink environment. The guideline
+# quick start guideline to set up Flink ML and Flink environment. The guideline
 # can be found at
 #
 # https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
diff --git a/flink-ml-python/pyflink/examples/ml/feature/minmaxscaler_example.py b/flink-ml-python/pyflink/examples/ml/feature/minmaxscaler_example.py
index 207381d..5635140 100644
--- a/flink-ml-python/pyflink/examples/ml/feature/minmaxscaler_example.py
+++ b/flink-ml-python/pyflink/examples/ml/feature/minmaxscaler_example.py
@@ -20,7 +20,7 @@
 # engineering.
 #
 # Before executing this program, please make sure you have followed Flink ML's
-# quick start guideline to setup Flink ML and Flink environment. The guideline
+# quick start guideline to set up Flink ML and Flink environment. The guideline
 # can be found at
 #
 # https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
diff --git a/flink-ml-python/pyflink/examples/ml/feature/onehotencoder_example.py b/flink-ml-python/pyflink/examples/ml/feature/onehotencoder_example.py
index 8d87ba3..4dd8987 100644
--- a/flink-ml-python/pyflink/examples/ml/feature/onehotencoder_example.py
+++ b/flink-ml-python/pyflink/examples/ml/feature/onehotencoder_example.py
@@ -20,7 +20,7 @@
 # engineering.
 #
 # Before executing this program, please make sure you have followed Flink ML's
-# quick start guideline to setup Flink ML and Flink environment. The guideline
+# quick start guideline to set up Flink ML and Flink environment. The guideline
 # can be found at
 #
 # https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
diff --git a/flink-ml-python/pyflink/examples/ml/feature/standardscaler_example.py b/flink-ml-python/pyflink/examples/ml/feature/standardscaler_example.py
index b0f726e..5d0d40b 100644
--- a/flink-ml-python/pyflink/examples/ml/feature/standardscaler_example.py
+++ b/flink-ml-python/pyflink/examples/ml/feature/standardscaler_example.py
@@ -20,7 +20,7 @@
 # engineering.
 #
 # Before executing this program, please make sure you have followed Flink ML's
-# quick start guideline to setup Flink ML and Flink environment. The guideline
+# quick start guideline to set up Flink ML and Flink environment. The guideline
 # can be found at
 #
 # https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
diff --git a/flink-ml-python/pyflink/examples/ml/feature/stringindexer_example.py b/flink-ml-python/pyflink/examples/ml/feature/stringindexer_example.py
index 2f08b90..f6d411d 100644
--- a/flink-ml-python/pyflink/examples/ml/feature/stringindexer_example.py
+++ b/flink-ml-python/pyflink/examples/ml/feature/stringindexer_example.py
@@ -20,7 +20,7 @@
 # engineering.
 #
 # Before executing this program, please make sure you have followed Flink ML's
-# quick start guideline to setup Flink ML and Flink environment. The guideline
+# quick start guideline to set up Flink ML and Flink environment. The guideline
 # can be found at
 #
 # https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
diff --git a/flink-ml-python/pyflink/examples/ml/feature/vectorassembler_example.py b/flink-ml-python/pyflink/examples/ml/feature/vectorassembler_example.py
index db9ac44..8f45b17 100644
--- a/flink-ml-python/pyflink/examples/ml/feature/vectorassembler_example.py
+++ b/flink-ml-python/pyflink/examples/ml/feature/vectorassembler_example.py
@@ -20,7 +20,7 @@
 # engineering.
 #
 # Before executing this program, please make sure you have followed Flink ML's
-# quick start guideline to setup Flink ML and Flink environment. The guideline
+# quick start guideline to set up Flink ML and Flink environment. The guideline
 # can be found at
 #
 # https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
diff --git a/flink-ml-python/pyflink/examples/ml/regression/linearregression_example.py b/flink-ml-python/pyflink/examples/ml/regression/linearregression_example.py
index c14d92a..cfd5b6e 100644
--- a/flink-ml-python/pyflink/examples/ml/regression/linearregression_example.py
+++ b/flink-ml-python/pyflink/examples/ml/regression/linearregression_example.py
@@ -20,7 +20,7 @@
 # regression.
 #
 # Before executing this program, please make sure you have followed Flink ML's
-# quick start guideline to setup Flink ML and Flink environment. The guideline
+# quick start guideline to set up Flink ML and Flink environment. The guideline
 # can be found at
 #
 # https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/