You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2022/06/29 02:34:31 UTC

[flink-ml] branch master updated: [FLINK-27715] Add pyflink examples

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git


The following commit(s) were added to refs/heads/master by this push:
     new 91e7464  [FLINK-27715] Add pyflink examples
91e7464 is described below

commit 91e74648bf47d1027ef3362c61456b90fcf89535
Author: yunfengzhou-hub <yu...@outlook.com>
AuthorDate: Wed Jun 29 10:34:26 2022 +0800

    [FLINK-27715] Add pyflink examples
    
    This closes #121.
---
 .github/workflows/python-checks.yml                |   4 +-
 .../feature/stringindexer/IndexToStringModel.java  |   9 +-
 .../pyflink/examples/ml/classification/__init__.py |  17 +++
 .../examples/ml/classification/knn_example.py      |  93 +++++++++++++
 .../ml/classification/linearsvc_example.py         |  76 +++++++++++
 .../classification/logisticregression_example.py   |  77 +++++++++++
 .../ml/classification/naivebayes_example.py        |  80 +++++++++++
 .../pyflink/examples/ml/clustering/__init__.py     |  17 +++
 .../examples/ml/clustering/kmeans_example.py       |  67 +++++++++
 .../pyflink/examples/ml/evaluation/__init__.py     |  17 +++
 .../binaryclassificationevaluator_example.py       |  76 +++++++++++
 .../pyflink/examples/ml/feature/__init__.py        |  17 +++
 .../examples/ml/feature/bucketizer_example.py      |  73 ++++++++++
 .../ml/feature/indextostringmodel_example.py       |  76 +++++++++++
 .../examples/ml/feature/minmaxscaler_example.py    |  79 +++++++++++
 .../examples/ml/feature/onehotencoder_example.py   |  66 +++++++++
 .../examples/ml/feature/standardscaler_example.py  |  66 +++++++++
 .../examples/ml/feature/stringindexer_example.py   |  89 ++++++++++++
 .../examples/ml/feature/vectorassembler_example.py |  71 ++++++++++
 .../pyflink/examples/ml/regression/__init__.py     |  17 +++
 .../ml/regression/linearregression_example.py      |  73 ++++++++++
 .../pyflink/examples/ml/tests/__init__.py          |  30 ++++
 .../pyflink/examples/ml/tests/test_examples.py     |  37 +++++
 flink-ml-python/pyflink/ml/__init__.py             |   1 +
 flink-ml-python/pyflink/ml/core/linalg.py          |   2 +-
 .../tests/test_logisticregression.py               |  17 ---
 .../pyflink/ml/lib/feature/stringindexer.py        |   8 +-
 .../lib/feature/tests/test_indextostringmodel.py   |  77 +++++++++++
 .../pyflink/ml/lib/regression/__init__.py          |  17 +++
 .../pyflink/ml/lib/regression/common.py            |  74 ++++++++++
 .../pyflink/ml/lib/regression/linearregression.py  |  97 +++++++++++++
 .../pyflink/ml/lib/regression/tests/__init__.py    |  30 ++++
 .../lib/regression/tests/test_linearregression.py  | 151 +++++++++++++++++++++
 .../ml/lib/tests/test_ml_lib_completeness.py       |  16 ++-
 34 files changed, 1685 insertions(+), 32 deletions(-)

diff --git a/.github/workflows/python-checks.yml b/.github/workflows/python-checks.yml
index d53f4e4..2c59761 100644
--- a/.github/workflows/python-checks.yml
+++ b/.github/workflows/python-checks.yml
@@ -41,6 +41,8 @@ jobs:
         run: python -m mypy --config=setup.cfg
       - name: Test the source code
         working-directory: flink-ml-python
-        run: pytest
+        run: |
+          pytest pyflink/ml
+          pytest pyflink/examples
 
 
diff --git a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/stringindexer/IndexToStringModel.java b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/stringindexer/IndexToStringModel.java
index 4090193..f3a1720 100644
--- a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/stringindexer/IndexToStringModel.java
+++ b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/stringindexer/IndexToStringModel.java
@@ -147,15 +147,8 @@ public class IndexToStringModel
             }
 
             Row outputStrings = new Row(inputCols.length);
-            int stringId;
             for (int i = 0; i < inputCols.length; i++) {
-                try {
-                    stringId = (Integer) input.getField(inputCols[i]);
-                } catch (Exception e) {
-                    throw new RuntimeException(
-                            "The input contains non-integer value: "
-                                    + input.getField(inputCols[i] + "."));
-                }
+                int stringId = (Integer) input.getField(inputCols[i]);
                 if (stringId < stringArrays[i].length && stringId >= 0) {
                     outputStrings.setField(i, stringArrays[i][stringId]);
                 } else {
diff --git a/flink-ml-python/pyflink/examples/ml/classification/__init__.py b/flink-ml-python/pyflink/examples/ml/classification/__init__.py
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/classification/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git a/flink-ml-python/pyflink/examples/ml/classification/knn_example.py b/flink-ml-python/pyflink/examples/ml/classification/knn_example.py
new file mode 100644
index 0000000..1c4b0fb
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/classification/knn_example.py
@@ -0,0 +1,93 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that trains a Knn model and uses it for classification.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.classification.knn import KNN
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input training and prediction data
+train_data = t_env.from_data_stream(
+    env.from_collection([
+        (Vectors.dense([2.0, 3.0]), 1.0),
+        (Vectors.dense([2.1, 3.1]), 1.0),
+        (Vectors.dense([200.1, 300.1]), 2.0),
+        (Vectors.dense([200.2, 300.2]), 2.0),
+        (Vectors.dense([200.3, 300.3]), 2.0),
+        (Vectors.dense([200.4, 300.4]), 2.0),
+        (Vectors.dense([200.4, 300.4]), 2.0),
+        (Vectors.dense([200.6, 300.6]), 2.0),
+        (Vectors.dense([2.1, 3.1]), 1.0),
+        (Vectors.dense([2.1, 3.1]), 1.0),
+        (Vectors.dense([2.1, 3.1]), 1.0),
+        (Vectors.dense([2.1, 3.1]), 1.0),
+        (Vectors.dense([2.3, 3.2]), 1.0),
+        (Vectors.dense([2.3, 3.2]), 1.0),
+        (Vectors.dense([2.8, 3.2]), 3.0),
+        (Vectors.dense([300., 3.2]), 4.0),
+        (Vectors.dense([2.2, 3.2]), 1.0),
+        (Vectors.dense([2.4, 3.2]), 5.0),
+        (Vectors.dense([2.5, 3.2]), 5.0),
+        (Vectors.dense([2.5, 3.2]), 5.0),
+        (Vectors.dense([2.1, 3.1]), 1.0)
+    ],
+        type_info=Types.ROW_NAMED(
+            ['features', 'label'],
+            [DenseVectorTypeInfo(), Types.DOUBLE()])))
+
+predict_data = t_env.from_data_stream(
+    env.from_collection([
+        (Vectors.dense([4.0, 4.1]), 5.0),
+        (Vectors.dense([300, 42]), 2.0),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['features', 'label'],
+            [DenseVectorTypeInfo(), Types.DOUBLE()])))
+
+# create a knn object and initialize its parameters
+knn = KNN().set_k(4)
+
+# train the knn model
+model = knn.fit(train_data)
+
+# use the knn model for predictions
+output = model.transform(predict_data)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+    features = result[field_names.index(knn.get_features_col())]
+    expected_result = result[field_names.index(knn.get_label_col())]
+    actual_result = result[field_names.index(knn.get_prediction_col())]
+    print('Features: ' + str(features) + ' \tExpected Result: ' + str(expected_result)
+          + ' \tActual Result: ' + str(actual_result))
diff --git a/flink-ml-python/pyflink/examples/ml/classification/linearsvc_example.py b/flink-ml-python/pyflink/examples/ml/classification/linearsvc_example.py
new file mode 100644
index 0000000..eb19aef
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/classification/linearsvc_example.py
@@ -0,0 +1,76 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that trains a LinearSVC model and uses it for classification.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.classification.linearsvc import LinearSVC
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_table = t_env.from_data_stream(
+    env.from_collection([
+        (Vectors.dense([1, 2, 3, 4]), 0., 1.),
+        (Vectors.dense([2, 2, 3, 4]), 0., 2.),
+        (Vectors.dense([3, 2, 3, 4]), 0., 3.),
+        (Vectors.dense([4, 2, 3, 4]), 0., 4.),
+        (Vectors.dense([5, 2, 3, 4]), 0., 5.),
+        (Vectors.dense([11, 2, 3, 4]), 1., 1.),
+        (Vectors.dense([12, 2, 3, 4]), 1., 2.),
+        (Vectors.dense([13, 2, 3, 4]), 1., 3.),
+        (Vectors.dense([14, 2, 3, 4]), 1., 4.),
+        (Vectors.dense([15, 2, 3, 4]), 1., 5.),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['features', 'label', 'weight'],
+            [DenseVectorTypeInfo(), Types.DOUBLE(), Types.DOUBLE()])
+    ))
+
+# create a linear svc object and initialize its parameters
+linear_svc = LinearSVC().set_weight_col('weight')
+
+# train the linear svc model
+model = linear_svc.fit(input_table)
+
+# use the linear svc model for predictions
+output = model.transform(input_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+    features = result[field_names.index(linear_svc.get_features_col())]
+    expected_result = result[field_names.index(linear_svc.get_label_col())]
+    prediction_result = result[field_names.index(linear_svc.get_prediction_col())]
+    raw_prediction_result = result[field_names.index(linear_svc.get_raw_prediction_col())]
+    print('Features: ' + str(features) + ' \tExpected Result: ' + str(expected_result)
+          + ' \tPrediction Result: ' + str(prediction_result)
+          + ' \tRaw Prediction Result: ' + str(raw_prediction_result))
diff --git a/flink-ml-python/pyflink/examples/ml/classification/logisticregression_example.py b/flink-ml-python/pyflink/examples/ml/classification/logisticregression_example.py
new file mode 100644
index 0000000..1b31971
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/classification/logisticregression_example.py
@@ -0,0 +1,77 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that trains a LogisticRegression model and uses it for
+# classification.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.classification.logisticregression import LogisticRegression
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data = t_env.from_data_stream(
+    env.from_collection([
+        (Vectors.dense([1, 2, 3, 4]), 0., 1.),
+        (Vectors.dense([2, 2, 3, 4]), 0., 2.),
+        (Vectors.dense([3, 2, 3, 4]), 0., 3.),
+        (Vectors.dense([4, 2, 3, 4]), 0., 4.),
+        (Vectors.dense([5, 2, 3, 4]), 0., 5.),
+        (Vectors.dense([11, 2, 3, 4]), 1., 1.),
+        (Vectors.dense([12, 2, 3, 4]), 1., 2.),
+        (Vectors.dense([13, 2, 3, 4]), 1., 3.),
+        (Vectors.dense([14, 2, 3, 4]), 1., 4.),
+        (Vectors.dense([15, 2, 3, 4]), 1., 5.),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['features', 'label', 'weight'],
+            [DenseVectorTypeInfo(), Types.DOUBLE(), Types.DOUBLE()])
+    ))
+
+# create a logistic regression object and initialize its parameters
+logistic_regression = LogisticRegression().set_weight_col('weight')
+
+# train the logistic regression model
+model = logistic_regression.fit(input_data)
+
+# use the logistic regression model for predictions
+output = model.transform(input_data)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+    features = result[field_names.index(logistic_regression.get_features_col())]
+    expected_result = result[field_names.index(logistic_regression.get_label_col())]
+    prediction_result = result[field_names.index(logistic_regression.get_prediction_col())]
+    raw_prediction_result = result[field_names.index(logistic_regression.get_raw_prediction_col())]
+    print('Features: ' + str(features) + ' \tExpected Result: ' + str(expected_result)
+          + ' \tPrediction Result: ' + str(prediction_result)
+          + ' \tRaw Prediction Result: ' + str(raw_prediction_result))
diff --git a/flink-ml-python/pyflink/examples/ml/classification/naivebayes_example.py b/flink-ml-python/pyflink/examples/ml/classification/naivebayes_example.py
new file mode 100644
index 0000000..20b940d
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/classification/naivebayes_example.py
@@ -0,0 +1,80 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that trains a NaiveBayes model and uses it for classification.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.classification.naivebayes import NaiveBayes
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input training and prediction data
+train_table = t_env.from_data_stream(
+    env.from_collection([
+        (Vectors.dense([0, 0.]), 11.),
+        (Vectors.dense([1, 0]), 10.),
+        (Vectors.dense([1, 1.]), 10.),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['features', 'label'],
+            [DenseVectorTypeInfo(), Types.DOUBLE()])))
+
+predict_table = t_env.from_data_stream(
+    env.from_collection([
+        (Vectors.dense([0, 1.]),),
+        (Vectors.dense([0, 0.]),),
+        (Vectors.dense([1, 0]),),
+        (Vectors.dense([1, 1.]),),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['features'],
+            [DenseVectorTypeInfo()])))
+
+# create a naive bayes object and initialize its parameters
+naive_bayes = NaiveBayes() \
+    .set_smoothing(1.0) \
+    .set_features_col('features') \
+    .set_label_col('label') \
+    .set_prediction_col('prediction') \
+    .set_model_type('multinomial')
+
+# train the naive bayes model
+model = naive_bayes.fit(train_table)
+
+# use the naive bayes model for predictions
+output = model.transform(predict_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+    features = result[field_names.index(naive_bayes.get_features_col())]
+    prediction_result = result[field_names.index(naive_bayes.get_prediction_col())]
+    print('Features: ' + str(features) + ' \tPrediction Result: ' + str(prediction_result))
diff --git a/flink-ml-python/pyflink/examples/ml/clustering/__init__.py b/flink-ml-python/pyflink/examples/ml/clustering/__init__.py
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/clustering/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git a/flink-ml-python/pyflink/examples/ml/clustering/kmeans_example.py b/flink-ml-python/pyflink/examples/ml/clustering/kmeans_example.py
new file mode 100644
index 0000000..72c5d10
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/clustering/kmeans_example.py
@@ -0,0 +1,67 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that trains a KMeans model and uses it for clustering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.clustering.kmeans import KMeans
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data = t_env.from_data_stream(
+    env.from_collection([
+        (Vectors.dense([0.0, 0.0]),),
+        (Vectors.dense([0.0, 0.3]),),
+        (Vectors.dense([0.3, 3.0]),),
+        (Vectors.dense([9.0, 0.0]),),
+        (Vectors.dense([9.0, 0.6]),),
+        (Vectors.dense([9.6, 0.0]),),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['features'],
+            [DenseVectorTypeInfo()])))
+
+# create a kmeans object and initialize its parameters
+kmeans = KMeans().set_k(2).set_seed(1)
+
+# train the kmeans model
+model = kmeans.fit(input_data)
+
+# use the kmeans model for predictions
+output = model.transform(input_data)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+    features = result[field_names.index(kmeans.get_features_col())]
+    cluster_id = result[field_names.index(kmeans.get_prediction_col())]
+    print('Features: ' + str(features) + ' \tCluster Id: ' + str(cluster_id))
diff --git a/flink-ml-python/pyflink/examples/ml/evaluation/__init__.py b/flink-ml-python/pyflink/examples/ml/evaluation/__init__.py
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/evaluation/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git a/flink-ml-python/pyflink/examples/ml/evaluation/binaryclassificationevaluator_example.py b/flink-ml-python/pyflink/examples/ml/evaluation/binaryclassificationevaluator_example.py
new file mode 100644
index 0000000..72ca449
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/evaluation/binaryclassificationevaluator_example.py
@@ -0,0 +1,76 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that creates a BinaryClassificationEvaluator instance and uses
+# it for evaluation.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.evaluation.binaryclassificationevaluator import BinaryClassificationEvaluator
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_table = t_env.from_data_stream(
+    env.from_collection([
+        (1.0, Vectors.dense(0.1, 0.9)),
+        (1.0, Vectors.dense(0.2, 0.8)),
+        (1.0, Vectors.dense(0.3, 0.7)),
+        (0.0, Vectors.dense(0.25, 0.75)),
+        (0.0, Vectors.dense(0.4, 0.6)),
+        (1.0, Vectors.dense(0.35, 0.65)),
+        (1.0, Vectors.dense(0.45, 0.55)),
+        (0.0, Vectors.dense(0.6, 0.4)),
+        (0.0, Vectors.dense(0.7, 0.3)),
+        (1.0, Vectors.dense(0.65, 0.35)),
+        (0.0, Vectors.dense(0.8, 0.2)),
+        (1.0, Vectors.dense(0.9, 0.1))
+    ],
+        type_info=Types.ROW_NAMED(
+            ['label', 'rawPrediction'],
+            [Types.DOUBLE(), DenseVectorTypeInfo()]))
+)
+
+# create a binary classification evaluator object and initialize its parameters
+evaluator = BinaryClassificationEvaluator() \
+    .set_metrics_names('areaUnderPR', 'ks', 'areaUnderROC')
+
+# use the binary classification evaluator model for evaluations
+output = evaluator.transform(input_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+result = t_env.to_data_stream(output).execute_and_collect().next()
+print('Area under the precision-recall curve: '
+      + str(result[field_names.index('areaUnderPR')]))
+print('Area under the receiver operating characteristic curve: '
+      + str(result[field_names.index('areaUnderROC')]))
+print('Kolmogorov-Smirnov value: '
+      + str(result[field_names.index('ks')]))
diff --git a/flink-ml-python/pyflink/examples/ml/feature/__init__.py b/flink-ml-python/pyflink/examples/ml/feature/__init__.py
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git a/flink-ml-python/pyflink/examples/ml/feature/bucketizer_example.py b/flink-ml-python/pyflink/examples/ml/feature/bucketizer_example.py
new file mode 100644
index 0000000..c0d3b81
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/bucketizer_example.py
@@ -0,0 +1,73 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that creates a Bucketizer instance and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.lib.feature.bucketizer import Bucketizer
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data = t_env.from_data_stream(
+    env.from_collection([
+        (-0.5, 0.0, 1.0, 0.0),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['f1', 'f2', 'f3', 'f4'],
+            [Types.DOUBLE(), Types.DOUBLE(), Types.DOUBLE(), Types.DOUBLE()])
+    ))
+
+# create a bucketizer object and initialize its parameters
+splits_array = [
+    [-0.5, 0.0, 0.5],
+    [-1.0, 0.0, 2.0],
+    [float('-inf'), 10.0, float('inf')],
+    [float('-inf'), 1.5, float('inf')],
+]
+
+bucketizer = Bucketizer() \
+    .set_input_cols('f1', 'f2', 'f3', 'f4') \
+    .set_output_cols('o1', 'o2', 'o3', 'o4') \
+    .set_splits_array(splits_array)
+
+# use the bucketizer model for feature engineering
+output = bucketizer.transform(input_data)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+input_values = [None for _ in bucketizer.get_input_cols()]
+output_values = [None for _ in bucketizer.get_input_cols()]
+for result in t_env.to_data_stream(output).execute_and_collect():
+    for i in range(len(bucketizer.get_input_cols())):
+        input_values[i] = result[field_names.index(bucketizer.get_input_cols()[i])]
+        output_values[i] = result[field_names.index(bucketizer.get_output_cols()[i])]
+    print('Input Values: ' + str(input_values) + '\tOutput Values: ' + str(output_values))
diff --git a/flink-ml-python/pyflink/examples/ml/feature/indextostringmodel_example.py b/flink-ml-python/pyflink/examples/ml/feature/indextostringmodel_example.py
new file mode 100644
index 0000000..72f8989
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/indextostringmodel_example.py
@@ -0,0 +1,76 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that creates a IndexToStringModelExample instance and uses it
+# for feature engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.lib.feature.stringindexer import IndexToStringModel
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+predict_table = t_env.from_data_stream(
+    env.from_collection([
+        (0, 3),
+        (1, 2),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['input_col1', 'input_col2'],
+            [Types.INT(), Types.INT()])
+    ))
+
+# create an index-to-string model and initialize its parameters and model data
+model_data_table = t_env.from_data_stream(
+    env.from_collection([
+        ([['a', 'b', 'c', 'd'], [-1., 0., 1., 2.]],),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['stringArrays'],
+            [Types.OBJECT_ARRAY(Types.OBJECT_ARRAY(Types.STRING()))])
+    ))
+
+model = IndexToStringModel() \
+    .set_input_cols('input_col1', 'input_col2') \
+    .set_output_cols('output_col1', 'output_col2') \
+    .set_model_data(model_data_table)
+
+# use the index-to-string model for feature engineering
+output = model.transform(predict_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+input_values = [None for _ in model.get_input_cols()]
+output_values = [None for _ in model.get_input_cols()]
+for result in t_env.to_data_stream(output).execute_and_collect():
+    for i in range(len(model.get_input_cols())):
+        input_values[i] = result[field_names.index(model.get_input_cols()[i])]
+        output_values[i] = result[field_names.index(model.get_output_cols()[i])]
+    print('Input Values: ' + str(input_values) + '\tOutput Values: ' + str(output_values))
diff --git a/flink-ml-python/pyflink/examples/ml/feature/minmaxscaler_example.py b/flink-ml-python/pyflink/examples/ml/feature/minmaxscaler_example.py
new file mode 100644
index 0000000..207381d
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/minmaxscaler_example.py
@@ -0,0 +1,79 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that trains a MinMaxScaler model and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.minmaxscaler import MinMaxScaler
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input training and prediction data
+train_data = t_env.from_data_stream(
+    env.from_collection([
+        (Vectors.dense(0.0, 3.0),),
+        (Vectors.dense(2.1, 0.0),),
+        (Vectors.dense(4.1, 5.1),),
+        (Vectors.dense(6.1, 8.1),),
+        (Vectors.dense(200, 400),),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['input'],
+            [DenseVectorTypeInfo()])
+    ))
+
+predict_data = t_env.from_data_stream(
+    env.from_collection([
+        (Vectors.dense(150.0, 90.0),),
+        (Vectors.dense(50.0, 40.0),),
+        (Vectors.dense(100.0, 50.0),),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['input'],
+            [DenseVectorTypeInfo()])
+    ))
+
+# create a min-max-scaler object and initialize its parameters
+min_max_scaler = MinMaxScaler()
+
+# train the min-max-scaler model
+model = min_max_scaler.fit(train_data)
+
+# use the min-max-scaler model for predictions
+output = model.transform(predict_data)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+    input_value = result[field_names.index(min_max_scaler.get_input_col())]
+    output_value = result[field_names.index(min_max_scaler.get_output_col())]
+    print('Input Value: ' + str(input_value) + ' \tOutput Value: ' + str(output_value))
diff --git a/flink-ml-python/pyflink/examples/ml/feature/onehotencoder_example.py b/flink-ml-python/pyflink/examples/ml/feature/onehotencoder_example.py
new file mode 100644
index 0000000..8d87ba3
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/onehotencoder_example.py
@@ -0,0 +1,66 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that trains a OneHotEncoder model and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Row
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.lib.feature.onehotencoder import OneHotEncoder
+from pyflink.table import StreamTableEnvironment, DataTypes
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input training and prediction data
+train_table = t_env.from_elements(
+    [Row(0.0), Row(1.0), Row(2.0), Row(0.0)],
+    DataTypes.ROW([
+        DataTypes.FIELD('input', DataTypes.DOUBLE())
+    ]))
+
+predict_table = t_env.from_elements(
+    [Row(0.0), Row(1.0), Row(2.0)],
+    DataTypes.ROW([
+        DataTypes.FIELD('input', DataTypes.DOUBLE())
+    ]))
+
+# create a one-hot-encoder object and initialize its parameters
+one_hot_encoder = OneHotEncoder().set_input_cols('input').set_output_cols('output')
+
+# train the one-hot-encoder model
+model = one_hot_encoder.fit(train_table)
+
+# use the one-hot-encoder model for predictions
+output = model.transform(predict_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+    input_value = result[field_names.index(one_hot_encoder.get_input_cols()[0])]
+    output_value = result[field_names.index(one_hot_encoder.get_output_cols()[0])]
+    print('Input Value: ' + str(input_value) + ' \tOutput Value: ' + str(output_value))
diff --git a/flink-ml-python/pyflink/examples/ml/feature/standardscaler_example.py b/flink-ml-python/pyflink/examples/ml/feature/standardscaler_example.py
new file mode 100644
index 0000000..b0f726e
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/standardscaler_example.py
@@ -0,0 +1,66 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that trains a StandardScaler model and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.standardscaler import StandardScaler
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data = t_env.from_data_stream(
+    env.from_collection([
+        (Vectors.dense(-2.5, 9, 1),),
+        (Vectors.dense(1.4, -5, 1),),
+        (Vectors.dense(2, -1, -2),),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['input'],
+            [DenseVectorTypeInfo()])
+    ))
+
+# create a standard-scaler object and initialize its parameters
+standard_scaler = StandardScaler()
+
+# train the standard-scaler model
+model = standard_scaler.fit(input_data)
+
+# use the standard-scaler model for predictions
+output = model.transform(input_data)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+    input_value = result[field_names.index(standard_scaler.get_input_col())]
+    output_value = result[field_names.index(standard_scaler.get_output_col())]
+    print('Input Value: ' + str(input_value) + ' \tOutput Value: ' + str(output_value))
diff --git a/flink-ml-python/pyflink/examples/ml/feature/stringindexer_example.py b/flink-ml-python/pyflink/examples/ml/feature/stringindexer_example.py
new file mode 100644
index 0000000..2f08b90
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/stringindexer_example.py
@@ -0,0 +1,89 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that trains a StringIndexer model and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.lib.feature.stringindexer import StringIndexer
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input training and prediction data
+train_table = t_env.from_data_stream(
+    env.from_collection([
+        ('a', 1.),
+        ('b', 1.),
+        ('b', 2.),
+        ('c', 0.),
+        ('d', 2.),
+        ('a', 2.),
+        ('b', 2.),
+        ('b', -1.),
+        ('a', -1.),
+        ('c', -1.),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['input_col1', 'input_col2'],
+            [Types.STRING(), Types.DOUBLE()])
+    ))
+
+predict_table = t_env.from_data_stream(
+    env.from_collection([
+        ('a', 2.),
+        ('b', 1.),
+        ('c', 2.),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['input_col1', 'input_col2'],
+            [Types.STRING(), Types.DOUBLE()])
+    ))
+
+# create a string-indexer object and initialize its parameters
+string_indexer = StringIndexer() \
+    .set_string_order_type('alphabetAsc') \
+    .set_input_cols('input_col1', 'input_col2') \
+    .set_output_cols('output_col1', 'output_col2')
+
+# train the string-indexer model
+model = string_indexer.fit(train_table)
+
+# use the string-indexer model for feature engineering
+output = model.transform(predict_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+input_values = [None for _ in string_indexer.get_input_cols()]
+output_values = [None for _ in string_indexer.get_input_cols()]
+for result in t_env.to_data_stream(output).execute_and_collect():
+    for i in range(len(string_indexer.get_input_cols())):
+        input_values[i] = result[field_names.index(string_indexer.get_input_cols()[i])]
+        output_values[i] = result[field_names.index(string_indexer.get_output_cols()[i])]
+    print('Input Values: ' + str(input_values) + '\tOutput Values: ' + str(output_values))
diff --git a/flink-ml-python/pyflink/examples/ml/feature/vectorassembler_example.py b/flink-ml-python/pyflink/examples/ml/feature/vectorassembler_example.py
new file mode 100644
index 0000000..db9ac44
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/vectorassembler_example.py
@@ -0,0 +1,71 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that creates a VectorAssembler instance and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, SparseVectorTypeInfo
+from pyflink.ml.lib.feature.vectorassembler import VectorAssembler
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data_table = t_env.from_data_stream(
+    env.from_collection([
+        (Vectors.dense(2.1, 3.1),
+         1.0,
+         Vectors.sparse(5, [3], [1.0])),
+        (Vectors.dense(2.1, 3.1),
+         1.0,
+         Vectors.sparse(5, [1, 2, 3, 4],
+                        [1.0, 2.0, 3.0, 4.0])),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['vec', 'num', 'sparse_vec'],
+            [DenseVectorTypeInfo(), Types.DOUBLE(), SparseVectorTypeInfo()])))
+
+# create a vector assembler object and initialize its parameters
+vector_assembler = VectorAssembler() \
+    .set_input_cols('vec', 'num', 'sparse_vec') \
+    .set_output_col('assembled_vec') \
+    .set_handle_invalid('keep')
+
+# use the vector assembler model for feature engineering
+output = vector_assembler.transform(input_data_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+input_values = [None for _ in vector_assembler.get_input_cols()]
+for result in t_env.to_data_stream(output).execute_and_collect():
+    for i in range(len(vector_assembler.get_input_cols())):
+        input_values[i] = result[field_names.index(vector_assembler.get_input_cols()[i])]
+    output_value = result[field_names.index(vector_assembler.get_output_col())]
+    print('Input Values: ' + str(input_values) + '\tOutput Value: ' + str(output_value))
diff --git a/flink-ml-python/pyflink/examples/ml/regression/__init__.py b/flink-ml-python/pyflink/examples/ml/regression/__init__.py
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/regression/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git a/flink-ml-python/pyflink/examples/ml/regression/linearregression_example.py b/flink-ml-python/pyflink/examples/ml/regression/linearregression_example.py
new file mode 100644
index 0000000..c14d92a
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/regression/linearregression_example.py
@@ -0,0 +1,73 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that trains a LinearRegression model and uses it for
+# regression.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.regression.linearregression import LinearRegression
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_table = t_env.from_data_stream(
+    env.from_collection([
+        (Vectors.dense(2, 1), 4., 1.),
+        (Vectors.dense(3, 2), 7., 1.),
+        (Vectors.dense(4, 3), 10., 1.),
+        (Vectors.dense(2, 4), 10., 1.),
+        (Vectors.dense(2, 2), 6., 1.),
+        (Vectors.dense(4, 3), 10., 1.),
+        (Vectors.dense(1, 2), 5., 1.),
+        (Vectors.dense(5, 3), 11., 1.),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['features', 'label', 'weight'],
+            [DenseVectorTypeInfo(), Types.DOUBLE(), Types.DOUBLE()])
+    ))
+
+# create a linear regression object and initialize its parameters
+linear_regression = LinearRegression().set_weight_col('weight')
+
+# train the linear regression model
+model = linear_regression.fit(input_table)
+
+# use the linear regression model for predictions
+output = model.transform(input_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+    features = result[field_names.index(linear_regression.get_features_col())]
+    expected_result = result[field_names.index(linear_regression.get_label_col())]
+    prediction_result = result[field_names.index(linear_regression.get_prediction_col())]
+    print('Features: ' + str(features) + ' \tExpected Result: ' + str(expected_result)
+          + ' \tPrediction Result: ' + str(prediction_result))
diff --git a/flink-ml-python/pyflink/examples/ml/tests/__init__.py b/flink-ml-python/pyflink/examples/ml/tests/__init__.py
new file mode 100644
index 0000000..d91d483
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/tests/__init__.py
@@ -0,0 +1,30 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+import sys
+from pathlib import Path
+
+# Because the project and the dependent `pyflink` project have the same directory structure,
+# we need to manually add `flink-ml-python` path to `sys.path` in the test of this project to change
+# the order of package search.
+flink_ml_python_dir = Path(__file__).parents[4]
+sys.path.append(str(flink_ml_python_dir))
+
+import pyflink
+
+pyflink.__path__.insert(0, os.path.join(flink_ml_python_dir, 'pyflink'))
diff --git a/flink-ml-python/pyflink/examples/ml/tests/test_examples.py b/flink-ml-python/pyflink/examples/ml/tests/test_examples.py
new file mode 100644
index 0000000..afe6deb
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/tests/test_examples.py
@@ -0,0 +1,37 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import importlib
+import pkgutil
+
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+
+
+class ExamplesTest(PyFlinkMLTestCase):
+    def test_examples(self):
+        self.execute_all_in_module('pyflink.examples.ml.classification')
+        self.execute_all_in_module('pyflink.examples.ml.clustering')
+        self.execute_all_in_module('pyflink.examples.ml.evaluation')
+        self.execute_all_in_module('pyflink.examples.ml.feature')
+        self.execute_all_in_module('pyflink.examples.ml.regression')
+
+    def execute_all_in_module(self, module):
+        module = importlib.import_module(module)
+        for importer, sub_modname, ispkg in pkgutil.iter_modules(module.__path__):
+            # importing an example module means executing the example.
+            importlib.import_module(module.__name__ + "." + sub_modname)
diff --git a/flink-ml-python/pyflink/ml/__init__.py b/flink-ml-python/pyflink/ml/__init__.py
index 5bcf4b1..68e3a97 100644
--- a/flink-ml-python/pyflink/ml/__init__.py
+++ b/flink-ml-python/pyflink/ml/__init__.py
@@ -21,6 +21,7 @@ from pyflink.util import java_utils
 from pyflink.util.java_utils import to_jarray, load_java_class
 
 
+# TODO: Remove custom jar loader after FLINK-15635 and FLINK-28002 are fixed and released.
 def add_jars_to_context_class_loader(jar_urls):
     """
     Add jars to Python gateway server for local compilation and local execution (i.e. minicluster).
diff --git a/flink-ml-python/pyflink/ml/core/linalg.py b/flink-ml-python/pyflink/ml/core/linalg.py
index fc0cd95..196a8f4 100644
--- a/flink-ml-python/pyflink/ml/core/linalg.py
+++ b/flink-ml-python/pyflink/ml/core/linalg.py
@@ -615,7 +615,7 @@ class SparseVector(Vector):
     def __str__(self):
         inds = "[" + ",".join([str(i) for i in self._indices]) + "]"
         vals = "[" + ",".join([str(v) for v in self._values]) + "]"
-        return "(" + ",".join((str(self.size), inds, vals)) + ")"
+        return "(" + ",".join((str(self.size()), inds, vals)) + ")"
 
     def __repr__(self):
         inds = self._indices
diff --git a/flink-ml-python/pyflink/ml/lib/classification/tests/test_logisticregression.py b/flink-ml-python/pyflink/ml/lib/classification/tests/test_logisticregression.py
index e460155..6e46796 100644
--- a/flink-ml-python/pyflink/ml/lib/classification/tests/test_logisticregression.py
+++ b/flink-ml-python/pyflink/ml/lib/classification/tests/test_logisticregression.py
@@ -85,23 +85,6 @@ class LogisticRegressionTest(PyFlinkMLTestCase):
         self.assertEqual(regression.prediction_col, 'test_prediction_col')
         self.assertEqual(regression.raw_prediction_col, 'test_raw_prediction_col')
 
-    def test_feature_prediction_param(self):
-        temp_table = self.binomial_data_table.alias("test_features", "test_label", "test_weight")
-        regression = LogisticRegression() \
-            .set_features_col("test_features") \
-            .set_label_col("test_label") \
-            .set_weight_col("test_weight") \
-            .set_prediction_col("test_prediction_col") \
-            .set_raw_prediction_col("test_raw_prediction_col")
-        output = regression.fit(self.binomial_data_table).transform(temp_table)[0]
-
-        self.assertEqual(output.get_schema().get_field_names(),
-                         ['test_features',
-                          'test_label',
-                          'test_weight',
-                          'test_prediction_col',
-                          'test_raw_prediction_col'])
-
     def test_output_schema(self):
         temp_table = self.binomial_data_table.alias("test_features", "test_label", "test_weight")
         regression = LogisticRegression() \
diff --git a/flink-ml-python/pyflink/ml/lib/feature/stringindexer.py b/flink-ml-python/pyflink/ml/lib/feature/stringindexer.py
index 448f469..a42b6ed 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/stringindexer.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/stringindexer.py
@@ -23,9 +23,13 @@ from pyflink.ml.lib.feature.common import JavaFeatureModel, JavaFeatureEstimator
 from pyflink.ml.lib.param import HasInputCols, HasOutputCols, HasHandleInvalid
 
 
-class _IndexToStringModelParams(JavaWithParams):
+class _IndexToStringModelParams(
+    JavaWithParams,
+    HasInputCols,
+    HasOutputCols
+):
     """
-    Params for :class:`IndexToString`.
+    Params for :class:`IndexToStringModel`.
     """
 
     def __init__(self, java_params):
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_indextostringmodel.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_indextostringmodel.py
new file mode 100644
index 0000000..42df909
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_indextostringmodel.py
@@ -0,0 +1,77 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.common import Types, Row
+
+from pyflink.ml.lib.feature.stringindexer import IndexToStringModel
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+
+
+class IndexToStringModelTest(PyFlinkMLTestCase):
+    def setUp(self):
+        super(IndexToStringModelTest, self).setUp()
+        self.model_data_table = self.t_env.from_data_stream(
+            self.env.from_collection([
+                ([['a', 'b', 'c', 'd'], [-1., 0., 1., 2.]],),
+            ],
+                type_info=Types.ROW_NAMED(
+                    ['stringArrays'],
+                    [Types.OBJECT_ARRAY(Types.OBJECT_ARRAY(Types.STRING()))])
+            ))
+
+        self.predict_table = self.t_env.from_data_stream(
+            self.env.from_collection([
+                (0, 3),
+                (1, 2),
+            ],
+                type_info=Types.ROW_NAMED(
+                    ['input_col1', 'input_col2'],
+                    [Types.INT(), Types.INT()])
+            ))
+
+        self.expected_prediction = [
+            Row(0, 3, 'a', '2.0'),
+            Row(1, 2, 'b', '1.0'),
+        ]
+
+    def test_output_schema(self):
+        model = IndexToStringModel() \
+            .set_input_cols('input_col1', 'input_col2') \
+            .set_output_cols('output_col1', 'output_col2') \
+            .set_model_data(self.model_data_table)
+
+        output = model.transform(self.predict_table)[0]
+
+        self.assertEqual(
+            ['input_col1', 'input_col2', 'output_col1', 'output_col2'],
+            output.get_schema().get_field_names())
+
+    def test_fit_and_predict(self):
+        model = IndexToStringModel() \
+            .set_input_cols('input_col1', 'input_col2') \
+            .set_output_cols('output_col1', 'output_col2') \
+            .set_model_data(self.model_data_table)
+
+        output = model.transform(self.predict_table)[0]
+
+        predicted_results = [result for result in
+                             self.t_env.to_data_stream(output).execute_and_collect()]
+
+        predicted_results.sort(key=lambda x: x[0])
+
+        self.assertEqual(predicted_results, self.expected_prediction)
diff --git a/flink-ml-python/pyflink/ml/lib/regression/__init__.py b/flink-ml-python/pyflink/ml/lib/regression/__init__.py
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/regression/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git a/flink-ml-python/pyflink/ml/lib/regression/common.py b/flink-ml-python/pyflink/ml/lib/regression/common.py
new file mode 100644
index 0000000..028afd6
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/regression/common.py
@@ -0,0 +1,74 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from abc import ABC, abstractmethod
+
+from pyflink.ml.core.wrapper import JavaModel, JavaEstimator
+
+JAVA_REGRESSION_PACKAGE_NAME = "org.apache.flink.ml.regression"
+
+
+class JavaRegressionModel(JavaModel, ABC):
+    """
+    Wrapper class for a Java Regression Model.
+    """
+
+    def __init__(self, java_model):
+        super(JavaRegressionModel, self).__init__(java_model)
+
+    @classmethod
+    def _java_stage_path(cls) -> str:
+        return ".".join(
+            [JAVA_REGRESSION_PACKAGE_NAME,
+             cls._java_model_package_name(),
+             cls._java_model_class_name()])
+
+    @classmethod
+    @abstractmethod
+    def _java_model_package_name(cls) -> str:
+        pass
+
+    @classmethod
+    @abstractmethod
+    def _java_model_class_name(cls) -> str:
+        pass
+
+
+class JavaRegressionEstimator(JavaEstimator, ABC):
+    """
+    Wrapper class for a Java Regression Estimator.
+    """
+
+    def __init__(self):
+        super(JavaRegressionEstimator, self).__init__()
+
+    @classmethod
+    def _java_stage_path(cls):
+        return ".".join(
+            [JAVA_REGRESSION_PACKAGE_NAME,
+             cls._java_estimator_package_name(),
+             cls._java_estimator_class_name()])
+
+    @classmethod
+    @abstractmethod
+    def _java_estimator_package_name(cls) -> str:
+        pass
+
+    @classmethod
+    @abstractmethod
+    def _java_estimator_class_name(cls) -> str:
+        pass
diff --git a/flink-ml-python/pyflink/ml/lib/regression/linearregression.py b/flink-ml-python/pyflink/ml/lib/regression/linearregression.py
new file mode 100644
index 0000000..f19348b
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/regression/linearregression.py
@@ -0,0 +1,97 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from abc import ABC
+
+from pyflink.ml.core.wrapper import JavaWithParams
+from pyflink.ml.lib.regression.common import (JavaRegressionModel, JavaRegressionEstimator)
+from pyflink.ml.lib.param import (HasWeightCol, HasMaxIter, HasReg, HasLearningRate,
+                                  HasGlobalBatchSize, HasTol, HasFeaturesCol,
+                                  HasPredictionCol, HasLabelCol, HasElasticNet)
+
+
+class _LinearRegressionModelParams(
+    JavaWithParams,
+    HasFeaturesCol,
+    HasPredictionCol,
+    ABC
+):
+    """
+    Params for :class:`LinearRegressionModel`.
+    """
+
+    def __init__(self, java_params):
+        super(_LinearRegressionModelParams, self).__init__(java_params)
+
+
+class _LinearRegressionParams(
+    _LinearRegressionModelParams,
+    HasLabelCol,
+    HasWeightCol,
+    HasMaxIter,
+    HasReg,
+    HasElasticNet,
+    HasLearningRate,
+    HasGlobalBatchSize,
+    HasTol
+):
+    """
+    Params for :class:`LinearRegression`.
+    """
+
+    def __init__(self, java_params):
+        super(_LinearRegressionParams, self).__init__(java_params)
+
+
+class LinearRegressionModel(JavaRegressionModel, _LinearRegressionModelParams):
+    """
+    A Model which classifies data using the model data computed by :class:`LinearRegression`.
+    """
+
+    def __init__(self, java_model=None):
+        super(LinearRegressionModel, self).__init__(java_model)
+
+    @classmethod
+    def _java_model_package_name(cls) -> str:
+        return "linearregression"
+
+    @classmethod
+    def _java_model_class_name(cls) -> str:
+        return "LinearRegressionModel"
+
+
+class LinearRegression(JavaRegressionEstimator, _LinearRegressionParams):
+    """
+    An Estimator which implements the linear regression algorithm.
+
+    See https://en.wikipedia.org/wiki/Linear_regression.
+    """
+
+    def __init__(self):
+        super(LinearRegression, self).__init__()
+
+    @classmethod
+    def _create_model(cls, java_model) -> LinearRegressionModel:
+        return LinearRegressionModel(java_model)
+
+    @classmethod
+    def _java_estimator_package_name(cls) -> str:
+        return "linearregression"
+
+    @classmethod
+    def _java_estimator_class_name(cls) -> str:
+        return "LinearRegression"
diff --git a/flink-ml-python/pyflink/ml/lib/regression/tests/__init__.py b/flink-ml-python/pyflink/ml/lib/regression/tests/__init__.py
new file mode 100644
index 0000000..6698191
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/regression/tests/__init__.py
@@ -0,0 +1,30 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+import sys
+from pathlib import Path
+
+# Because the project and the dependent `pyflink` project have the same directory structure,
+# we need to manually add `flink-ml-python` path to `sys.path` in the test of this project to change
+# the order of package search.
+flink_ml_python_dir = Path(__file__).parents[5]
+sys.path.append(str(flink_ml_python_dir))
+
+import pyflink
+
+pyflink.__path__.insert(0, os.path.join(flink_ml_python_dir, 'pyflink'))
diff --git a/flink-ml-python/pyflink/ml/lib/regression/tests/test_linearregression.py b/flink-ml-python/pyflink/ml/lib/regression/tests/test_linearregression.py
new file mode 100644
index 0000000..4af1df3
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/regression/tests/test_linearregression.py
@@ -0,0 +1,151 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+
+from pyflink.common import Types
+from pyflink.table import Table
+
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.regression.linearregression import LinearRegression, \
+    LinearRegressionModel
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+
+
+class LinearRegressionTest(PyFlinkMLTestCase):
+    def setUp(self):
+        super(LinearRegressionTest, self).setUp()
+        self.input_data_table = self.t_env.from_data_stream(
+            self.env.from_collection([
+                (Vectors.dense(2, 1), 4., 1.),
+                (Vectors.dense(3, 2), 7., 1.),
+                (Vectors.dense(4, 3), 10., 1.),
+                (Vectors.dense(2, 4), 10., 1.),
+                (Vectors.dense(2, 2), 6., 1.),
+                (Vectors.dense(4, 3), 10., 1.),
+                (Vectors.dense(1, 2), 5., 1.),
+                (Vectors.dense(5, 3), 11., 1.),
+            ],
+                type_info=Types.ROW_NAMED(
+                    ['features', 'label', 'weight'],
+                    [DenseVectorTypeInfo(), Types.DOUBLE(), Types.DOUBLE()])
+            ))
+
+    def test_param(self):
+        regression = LinearRegression()
+        self.assertEqual(regression.label_col, 'label')
+        self.assertIsNone(regression.weight_col)
+        self.assertEqual(regression.max_iter, 20)
+        self.assertAlmostEqual(regression.reg, 0, delta=1e-7)
+        self.assertAlmostEqual(regression.learning_rate, 0.1, delta=1e-7)
+        self.assertEqual(regression.global_batch_size, 32)
+        self.assertAlmostEqual(regression.tol, 1e-6, delta=1e-7)
+        self.assertEqual(regression.features_col, 'features')
+        self.assertEqual(regression.prediction_col, 'prediction')
+
+        regression.set_features_col("test_features") \
+            .set_label_col("test_label") \
+            .set_weight_col("test_weight") \
+            .set_max_iter(1000) \
+            .set_tol(0.001) \
+            .set_learning_rate(0.5) \
+            .set_global_batch_size(1000) \
+            .set_reg(0.1) \
+            .set_prediction_col("test_prediction_col") \
+
+        self.assertEqual(regression.features_col, 'test_features')
+        self.assertEqual(regression.label_col, 'test_label')
+        self.assertEqual(regression.weight_col, 'test_weight')
+        self.assertEqual(regression.max_iter, 1000)
+        self.assertAlmostEqual(regression.reg, 0.1, delta=1e-7)
+        self.assertAlmostEqual(regression.learning_rate, 0.5, delta=1e-7)
+        self.assertEqual(regression.global_batch_size, 1000)
+        self.assertAlmostEqual(regression.tol, 0.001, delta=1e-7)
+        self.assertEqual(regression.prediction_col, 'test_prediction_col')
+
+    def test_output_schema(self):
+        temp_table = self.input_data_table.alias("test_features", "test_label", "test_weight")
+        regression = LinearRegression() \
+            .set_features_col('test_features') \
+            .set_label_col('test_label') \
+            .set_weight_col('test_weight') \
+            .set_prediction_col('test_prediction_col')
+        output = regression.fit(self.input_data_table).transform(temp_table)[0]
+        self.assertEqual(
+            ['test_features',
+             'test_label',
+             'test_weight',
+             'test_prediction_col'],
+            output.get_schema().get_field_names())
+
+    def test_fit_and_predict(self):
+        regression = LinearRegression().set_weight_col('weight')
+        output = regression.fit(self.input_data_table).transform(self.input_data_table)[0]
+        field_names = output.get_schema().get_field_names()
+        self.verify_predict_result(
+            output,
+            field_names.index(regression.get_label_col()),
+            field_names.index(regression.get_prediction_col()))
+
+    def test_save_load_and_predict(self):
+        regression = LinearRegression().set_weight_col('weight')
+        path = os.path.join(self.temp_dir, 'test_save_load_and_predict_linear_regression')
+        regression.save(path)
+        regression = LinearRegression.load(self.t_env, path)  # type: LinearRegression
+        model = regression.fit(self.input_data_table)
+        self.assertEqual(
+            model.get_model_data()[0].get_schema().get_field_names(),
+            ['coefficient'])
+        output = model.transform(self.input_data_table)[0]
+        field_names = output.get_schema().get_field_names()
+        self.verify_predict_result(
+            output,
+            field_names.index(regression.get_label_col()),
+            field_names.index(regression.get_prediction_col()))
+
+    def test_get_model_data(self):
+        regression = LinearRegression().set_weight_col('weight')
+        model = regression.fit(self.input_data_table)
+        model_data = self.t_env.to_data_stream(
+            model.get_model_data()[0]).execute_and_collect().next()
+        self.assertIsNotNone(model_data[0])
+        data = model_data[0].values.tolist()
+        expected = [1.141, 1.829]
+        self.assertEqual(len(data), len(expected))
+        for a, b in zip(data, expected):
+            self.assertAlmostEqual(a, b, delta=0.1)
+
+    def test_set_model_data(self):
+        regression = LinearRegression().set_weight_col('weight')
+        model = regression.fit(self.input_data_table)
+
+        new_model = LinearRegressionModel()
+        new_model.set_model_data(*model.get_model_data())
+        output = new_model.transform(self.input_data_table)[0]
+        field_names = output.get_schema().get_field_names()
+        self.verify_predict_result(
+            output,
+            field_names.index(regression.get_label_col()),
+            field_names.index(regression.get_prediction_col()))
+
+    def verify_predict_result(
+            self, output: Table, label_index, prediction_index):
+        with self.t_env.to_data_stream(output).execute_and_collect() as results:
+            for result in results:
+                label = result[label_index]  # type: float
+                prediction = result[prediction_index]  # type: float
+                self.assertAlmostEqual(label, prediction, delta=0.1 * label)
diff --git a/flink-ml-python/pyflink/ml/lib/tests/test_ml_lib_completeness.py b/flink-ml-python/pyflink/ml/lib/tests/test_ml_lib_completeness.py
index ff17298..7bd894d 100644
--- a/flink-ml-python/pyflink/ml/lib/tests/test_ml_lib_completeness.py
+++ b/flink-ml-python/pyflink/ml/lib/tests/test_ml_lib_completeness.py
@@ -22,9 +22,8 @@ import pkgutil
 import unittest
 from abc import abstractmethod
 
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
-
 from pyflink.java_gateway import get_gateway
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
 
 
 class CompletenessTest(object):
@@ -70,7 +69,8 @@ class MLLibTest(PyFlinkMLTestCase):
                     'JavaClassificationEstimator', 'JavaClassificationModel',
                     'JavaClusteringEstimator', 'JavaClusteringModel',
                     'JavaEvaluationAlgoOperator', 'JavaFeatureTransformer',
-                    'JavaFeatureEstimator', 'JavaFeatureModel')]
+                    'JavaFeatureEstimator', 'JavaFeatureModel',
+                    'JavaRegressionEstimator', 'JavaRegressionModel')]
 
     @abstractmethod
     def module_name(self):
@@ -120,6 +120,16 @@ class FeatureCompletenessTest(CompletenessTest, MLLibTest):
         return feature
 
 
+class RegressionCompletenessTest(CompletenessTest, MLLibTest):
+
+    def module_name(self):
+        return "regression"
+
+    def module(self):
+        from pyflink.ml.lib import regression
+        return regression
+
+
 if __name__ == "__main__":
     try:
         import xmlrunner