You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2022/11/22 04:12:15 UTC

[flink-ml] branch master updated: [FLINK-30099] Add test case for algorithms' python APIs

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git


The following commit(s) were added to refs/heads/master by this push:
     new 4939933  [FLINK-30099] Add test case for algorithms' python APIs
4939933 is described below

commit 4939933a00eff0a4dccf3b9d0fa06595cdfd06e6
Author: yunfengzhou-hub <yu...@outlook.com>
AuthorDate: Tue Nov 22 12:12:10 2022 +0800

    [FLINK-30099] Add test case for algorithms' python APIs
    
    This closes #178.
---
 .../ml/lib/classification/tests/test_knn.py        | 15 +++-
 .../ml/lib/classification/tests/test_linearsvc.py  | 49 ++++++++++++-
 .../ml/lib/classification/tests/test_naivebayes.py | 21 +++++-
 .../pyflink/ml/lib/clustering/tests/test_kmeans.py | 33 ++++++++-
 .../pyflink/ml/lib/feature/tests/test_idf.py       | 23 +++++-
 .../pyflink/ml/lib/feature/tests/test_imputer.py   | 30 +++++++-
 .../lib/feature/tests/test_indextostringmodel.py   | 28 ++++++++
 .../ml/lib/feature/tests/test_kbinsdiscretizer.py  | 22 +++++-
 .../ml/lib/feature/tests/test_maxabsscaler.py      | 66 +++++++++++++----
 .../ml/lib/feature/tests/test_minmaxscaler.py      | 82 ++++++++++++++++------
 .../ml/lib/feature/tests/test_onehotencoder.py     | 41 ++++++++++-
 .../ml/lib/feature/tests/test_robustscaler.py      | 34 ++++++++-
 .../ml/lib/feature/tests/test_standardscaler.py    | 46 +++++++++++-
 .../ml/lib/feature/tests/test_stringindexer.py     | 54 +++++++++++++-
 .../tests/test_variancethresholdselector.py        | 32 ++++++++-
 .../ml/lib/feature/tests/test_vectorindexer.py     | 27 ++++++-
 flink-ml-python/pyflink/ml/tests/test_utils.py     | 15 ++++
 17 files changed, 566 insertions(+), 52 deletions(-)

diff --git a/flink-ml-python/pyflink/ml/lib/classification/tests/test_knn.py b/flink-ml-python/pyflink/ml/lib/classification/tests/test_knn.py
index 77ad3fa..d140678 100644
--- a/flink-ml-python/pyflink/ml/lib/classification/tests/test_knn.py
+++ b/flink-ml-python/pyflink/ml/lib/classification/tests/test_knn.py
@@ -21,7 +21,7 @@ from pyflink.common import Types
 from pyflink.table import Table
 
 from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, DenseMatrix, DenseVector
-from pyflink.ml.lib.classification.knn import KNN
+from pyflink.ml.lib.classification.knn import KNN, KNNModel
 from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
 
 
@@ -157,6 +157,19 @@ class KNNTest(PyFlinkMLTestCase):
         self.assertEqual(packed_features.num_cols(), labels.size())
         self.assertEqual(feature_norm_squares.size(), labels.size())
 
+    def test_set_model_data(self):
+        knn = KNN()
+        model_a = knn.fit(self.train_data)
+        model_data = model_a.get_model_data()[0]
+
+        model_b = KNNModel().set_model_data(model_data)
+        output = model_b.transform(self.predict_data)[0]
+        field_names = output.get_schema().get_field_names()
+        self.verify_predict_result(
+            output,
+            field_names.index(knn.label_col),
+            field_names.index(knn.prediction_col))
+
     def verify_predict_result(
             self, output: Table, label_index, prediction_index):
         with self.t_env.to_data_stream(output).execute_and_collect() as results:
diff --git a/flink-ml-python/pyflink/ml/lib/classification/tests/test_linearsvc.py b/flink-ml-python/pyflink/ml/lib/classification/tests/test_linearsvc.py
index ada27e7..2cb6a4b 100644
--- a/flink-ml-python/pyflink/ml/lib/classification/tests/test_linearsvc.py
+++ b/flink-ml-python/pyflink/ml/lib/classification/tests/test_linearsvc.py
@@ -20,8 +20,8 @@ from pyflink.common import Types
 from pyflink.table import Table
 
 from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
-from pyflink.ml.lib.classification.linearsvc import LinearSVC
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.lib.classification.linearsvc import LinearSVC, LinearSVCModel
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
 
 
 class LinearSVCTest(PyFlinkMLTestCase):
@@ -44,6 +44,8 @@ class LinearSVCTest(PyFlinkMLTestCase):
                     ['features', 'label', 'weight'],
                     [DenseVectorTypeInfo(), Types.DOUBLE(), Types.DOUBLE()])))
 
+        self.eps = 0.1
+
     def test_param(self):
         linear_svc = LinearSVC()
         self.assertEqual('features', linear_svc.features_col)
@@ -113,6 +115,49 @@ class LinearSVCTest(PyFlinkMLTestCase):
             linear_svc.prediction_col,
             linear_svc.raw_prediction_col)
 
+    def test_get_model_data(self):
+        linear_svc = LinearSVC().set_weight_col('weight')
+        model_data = linear_svc.fit(self.train_data).get_model_data()[0]
+        expected_field_names = ['coefficient']
+        self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+        model_rows = [result for result in
+                      self.t_env.to_data_stream(model_data).execute_and_collect()]
+        self.assertEqual(1, len(model_rows))
+        self.assertListAlmostEqual(
+            [0.470, -0.273, -0.410, -0.546],
+            model_rows[expected_field_names.index('coefficient')][0].to_array(),
+            delta=self.eps)
+
+    def test_set_model_data(self):
+        linear_svc = LinearSVC().set_weight_col('weight')
+        model_a = linear_svc.fit(self.train_data)
+        model_data = model_a.get_model_data()[0]
+
+        model_b = LinearSVCModel().set_model_data(model_data)
+        update_existing_params(model_b, model_a)
+
+        output = model_b.transform(self.train_data)[0]
+        self.verify_prediction_result(
+            output,
+            output.get_schema().get_field_names(),
+            linear_svc.features_col,
+            linear_svc.prediction_col,
+            linear_svc.raw_prediction_col)
+
+    def test_save_load_and_predict(self):
+        linear_svc = LinearSVC().set_weight_col('weight')
+        reloaded_linear_svc = self.save_and_reload(linear_svc)
+        model = reloaded_linear_svc.fit(self.train_data)
+        reloaded_model = self.save_and_reload(model)
+        output = reloaded_model.transform(self.train_data)[0]
+        self.verify_prediction_result(
+            output,
+            output.get_schema().get_field_names(),
+            linear_svc.features_col,
+            linear_svc.prediction_col,
+            linear_svc.raw_prediction_col)
+
     def verify_prediction_result(self,
                                  output: Table,
                                  field_names,
diff --git a/flink-ml-python/pyflink/ml/lib/classification/tests/test_naivebayes.py b/flink-ml-python/pyflink/ml/lib/classification/tests/test_naivebayes.py
index 35fc816..16f0c8c 100644
--- a/flink-ml-python/pyflink/ml/lib/classification/tests/test_naivebayes.py
+++ b/flink-ml-python/pyflink/ml/lib/classification/tests/test_naivebayes.py
@@ -22,7 +22,7 @@ from pyflink.table import Table
 
 from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
 from pyflink.ml.lib.classification.naivebayes import NaiveBayes, NaiveBayesModel
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
 
 
 class NaiveBayesTest(PyFlinkMLTestCase):
@@ -112,6 +112,25 @@ class NaiveBayesTest(PyFlinkMLTestCase):
         actual_output = self.execute_and_collect(output_table)
         self.assertEqual(self.expected_output, actual_output)
 
+    def test_get_model_data(self):
+        model = self.estimator.fit(self.train_data)
+        model_data = model.get_model_data()[0]
+        expected_field_names = ['theta', 'piArray', 'labels']
+        self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+        # TODO: Add test to collect and verify the model data results after FLINK-30124 is resolved.
+
+    def test_set_model_data(self):
+        model_a = self.estimator.fit(self.train_data)
+        model_data = model_a.get_model_data()[0]
+
+        model_b = NaiveBayesModel().set_model_data(model_data)
+        update_existing_params(model_b, model_a)
+
+        output_table = model_b.transform(self.predict_data)[0]
+        actual_output = self.execute_and_collect(output_table)
+        self.assertEqual(self.expected_output, actual_output)
+
     def execute_and_collect(self, output: Table):
         res = {}
         with self.t_env.to_data_stream(output).execute_and_collect() as results:
diff --git a/flink-ml-python/pyflink/ml/lib/clustering/tests/test_kmeans.py b/flink-ml-python/pyflink/ml/lib/clustering/tests/test_kmeans.py
index f4738c5..c63b26b 100644
--- a/flink-ml-python/pyflink/ml/lib/clustering/tests/test_kmeans.py
+++ b/flink-ml-python/pyflink/ml/lib/clustering/tests/test_kmeans.py
@@ -22,7 +22,7 @@ from typing import List, Dict, Set
 
 from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, DenseVector
 from pyflink.ml.lib.clustering.kmeans import KMeans, KMeansModel, OnlineKMeans
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
 
 
 def group_features_by_prediction(
@@ -146,6 +146,37 @@ class KMeansTest(PyFlinkMLTestCase):
                         actual_groups[0] == self.expected_groups[1]
                         and actual_groups[1] == self.expected_groups[0])
 
+    def test_get_model_data(self):
+        kmeans = KMeans().set_max_iter(2).set_k(2)
+        model = kmeans.fit(self.data_table)
+        model_data = model.get_model_data()[0]
+        expected_field_names = ['centroids', 'weights']
+        self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+        # TODO: Add test to collect and verify the model data results after FLINK-30122 is resolved.
+
+    def test_set_model_data(self):
+        kmeans = KMeans().set_max_iter(2).set_k(2)
+        model_a = kmeans.fit(self.data_table)
+        model_data = model_a.get_model_data()[0]
+
+        model_b = KMeansModel().set_model_data(model_data)
+        update_existing_params(model_b, model_a)
+
+        output = model_b.transform(self.data_table)[0]
+        self.assertEqual(['features', 'prediction'], output.get_schema().get_field_names())
+        results = [result for result in self.t_env.to_data_stream(output).execute_and_collect()]
+        field_names = output.get_schema().get_field_names()
+        actual_groups = group_features_by_prediction(
+            results,
+            field_names.index(kmeans.features_col),
+            field_names.index(kmeans.prediction_col))
+
+        self.assertTrue(actual_groups[0] == self.expected_groups[0]
+                        and actual_groups[1] == self.expected_groups[1] or
+                        actual_groups[0] == self.expected_groups[1]
+                        and actual_groups[1] == self.expected_groups[0])
+
     def test_save_load_and_predict(self):
         kmeans = KMeans().set_max_iter(2).set_k(2)
         model = kmeans.fit(self.data_table)
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_idf.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_idf.py
index d192ef3..11f1887 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_idf.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_idf.py
@@ -22,7 +22,7 @@ from pyflink.common import Types
 
 from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
 from pyflink.ml.lib.feature.idf import IDF, IDFModel
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
 
 
 class IDFTest(PyFlinkMLTestCase):
@@ -112,6 +112,27 @@ class IDFTest(PyFlinkMLTestCase):
         output = idf.fit(self.input_data).transform(self.input_data)[0]
         self.verify_prediction_result(self.expected_output_min_doc_freq_as_two, output)
 
+    def test_get_model_data(self):
+        idf = IDF()
+        model = idf.fit(self.input_data)
+        model_data = model.get_model_data()[0]
+        expected_field_names = ['idf', 'docFreq', 'numDocs']
+        self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+        # TODO: Add test to collect and verify the model data results after Flink dependency
+        #  is upgraded to 1.15.3, 1.16.0 or a higher version. Related ticket: FLINK-29477
+
+    def test_set_model_data(self):
+        idf = IDF()
+        model_a = idf.fit(self.input_data)
+        model_data = model_a.get_model_data()[0]
+
+        model_b = IDFModel().set_model_data(model_data)
+        update_existing_params(model_b, model_a)
+
+        output = model_b.transform(self.input_data)[0]
+        self.verify_prediction_result(self.expected_output, output)
+
     def test_save_load_predict(self):
         idf = IDF()
         estimator_path = os.path.join(self.temp_dir, 'test_save_load_predict_idf')
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_imputer.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_imputer.py
index 3addeee..8f045ac 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_imputer.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_imputer.py
@@ -20,8 +20,8 @@ from typing import List
 import numpy as np
 from pyflink.table import Table
 from pyflink.common import Types, Row
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
-from pyflink.ml.lib.feature.imputer import Imputer
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
+from pyflink.ml.lib.feature.imputer import Imputer, ImputerModel
 
 
 class ImputerTest(PyFlinkMLTestCase):
@@ -107,6 +107,32 @@ class ImputerTest(PyFlinkMLTestCase):
             self.verify_output_result(
                 output, imputer.get_output_cols(), field_names, expected_output)
 
+    def test_get_model_data(self):
+        imputer = Imputer().\
+            set_input_cols('f1', 'f2', 'f3').\
+            set_output_cols('o1', 'o2', 'o3')
+        model = imputer.fit(self.train_table)
+        model_data = model.get_model_data()[0]
+        expected_field_names = ['surrogates']
+        self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+        # TODO: Add test to collect and verify the model data results after FLINK-30124 is resolved.
+
+    def test_set_model_data(self):
+        imputer = Imputer().\
+            set_input_cols('f1', 'f2', 'f3').\
+            set_output_cols('o1', 'o2', 'o3')
+        model_a = imputer.fit(self.train_table)
+        model_data = model_a.get_model_data()[0]
+
+        model_b = ImputerModel().set_model_data(model_data)
+        update_existing_params(model_b, model_a)
+
+        output = model_b.transform(self.train_table)[0]
+        field_names = output.get_schema().get_field_names()
+        self.verify_output_result(
+            output, imputer.get_output_cols(), field_names, self.expected_mean_strategy_output)
+
     def test_save_load_predict(self):
         imputer = Imputer(). \
             set_input_cols('f1', 'f2', 'f3'). \
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_indextostringmodel.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_indextostringmodel.py
index 42df909..41ef7bd 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_indextostringmodel.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_indextostringmodel.py
@@ -75,3 +75,31 @@ class IndexToStringModelTest(PyFlinkMLTestCase):
         predicted_results.sort(key=lambda x: x[0])
 
         self.assertEqual(predicted_results, self.expected_prediction)
+
+    def test_get_model_data(self):
+        model = IndexToStringModel() \
+            .set_input_cols('input_col1', 'input_col2') \
+            .set_output_cols('output_col1', 'output_col2') \
+            .set_model_data(self.model_data_table)
+        model_data = model.get_model_data()[0]
+        expected_field_names = ['stringArrays']
+        self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+        # TODO: Add test to collect and verify the model data results after FLINK-30122 is resolved.
+
+    def test_save_load_and_predict(self):
+        model = IndexToStringModel() \
+            .set_input_cols('input_col1', 'input_col2') \
+            .set_output_cols('output_col1', 'output_col2') \
+            .set_model_data(self.model_data_table)
+
+        reloaded_model = self.save_and_reload(model)
+
+        output = reloaded_model.transform(self.predict_table)[0]
+
+        predicted_results = [result for result in
+                             self.t_env.to_data_stream(output).execute_and_collect()]
+
+        predicted_results.sort(key=lambda x: x[0])
+
+        self.assertEqual(predicted_results, self.expected_prediction)
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_kbinsdiscretizer.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_kbinsdiscretizer.py
index 2dacb68..70bc9a5 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_kbinsdiscretizer.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_kbinsdiscretizer.py
@@ -22,7 +22,7 @@ from pyflink.common import Types
 
 from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
 from pyflink.ml.lib.feature.kbinsdiscretizer import KBinsDiscretizer, KBinsDiscretizerModel
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
 
 
 class KBinsDiscretizerTest(PyFlinkMLTestCase):
@@ -156,6 +156,26 @@ class KBinsDiscretizerTest(PyFlinkMLTestCase):
         output = k_bins_discretizer.fit(self.train_table).transform(self.predict_table)[0]
         self.verify_prediction_result(self.kmeans_output, output)
 
+    def test_get_model_data(self):
+        k_bins_discretizer = KBinsDiscretizer().set_num_bins(3).set_strategy('uniform')
+        model = k_bins_discretizer.fit(self.train_table)
+        model_data = model.get_model_data()[0]
+        expected_field_names = ['binEdges']
+        self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+        # TODO: Add test to collect and verify the model data results after FLINK-30122 is resolved.
+
+    def test_set_model_data(self):
+        k_bins_discretizer = KBinsDiscretizer().set_num_bins(3).set_strategy('uniform')
+        model_a = k_bins_discretizer.fit(self.train_table)
+        model_data = model_a.get_model_data()[0]
+
+        model_b = KBinsDiscretizerModel().set_model_data(model_data)
+        update_existing_params(model_b, model_a)
+
+        output = model_b.transform(self.predict_table)[0]
+        self.verify_prediction_result(self.uniform_output, output)
+
     def test_save_load_predict(self):
         k_bins_discretizer = KBinsDiscretizer().set_num_bins(3)
         estimator_path = os.path.join(self.temp_dir, 'test_save_load_predict_kbinsdiscretizer')
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_maxabsscaler.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_maxabsscaler.py
index 925ae4a..97d974d 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_maxabsscaler.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_maxabsscaler.py
@@ -21,8 +21,8 @@ from pyflink.common import Types
 from pyflink.table import Table
 
 from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, DenseVector
-from pyflink.ml.lib.feature.maxabsscaler import MaxAbsScaler
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.lib.feature.maxabsscaler import MaxAbsScaler, MaxAbsScalerModel
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
 
 
 class MaxAbsScalerTest(PyFlinkMLTestCase):
@@ -55,32 +55,72 @@ class MaxAbsScalerTest(PyFlinkMLTestCase):
             Vectors.dense(0.75, 0.225)]
 
     def test_param(self):
-        max_abs_scalar = MaxAbsScaler()
-        self.assertEqual("input", max_abs_scalar.input_col)
-        self.assertEqual("output", max_abs_scalar.output_col)
-        max_abs_scalar.set_input_col('test_input') \
+        max_abs_scaler = MaxAbsScaler()
+        self.assertEqual("input", max_abs_scaler.input_col)
+        self.assertEqual("output", max_abs_scaler.output_col)
+        max_abs_scaler.set_input_col('test_input') \
             .set_output_col('test_output')
-        self.assertEqual('test_input', max_abs_scalar.input_col)
-        self.assertEqual('test_output', max_abs_scalar.output_col)
+        self.assertEqual('test_input', max_abs_scaler.input_col)
+        self.assertEqual('test_output', max_abs_scaler.output_col)
 
     def test_output_schema(self):
-        max_abs_scalar = MaxAbsScaler() \
+        max_abs_scaler = MaxAbsScaler() \
             .set_input_col('test_input') \
             .set_output_col('test_output')
 
-        model = max_abs_scalar.fit(self.train_data.alias('test_input'))
+        model = max_abs_scaler.fit(self.train_data.alias('test_input'))
         output = model.transform(self.predict_data.alias('test_input'))[0]
         self.assertEqual(
             ['test_input', 'test_output'],
             output.get_schema().get_field_names())
 
     def test_fit_and_predict(self):
-        max_abs_scalar = MaxAbsScaler()
-        model = max_abs_scalar.fit(self.train_data)
+        max_abs_scaler = MaxAbsScaler()
+        model = max_abs_scaler.fit(self.train_data)
         output = model.transform(self.predict_data)[0]
         self.verify_output_result(
             output,
-            max_abs_scalar.get_output_col(),
+            max_abs_scaler.get_output_col(),
+            output.get_schema().get_field_names(),
+            self.expected_data)
+
+    def test_get_model_data(self):
+        max_abs_scaler = MaxAbsScaler()
+        model = max_abs_scaler.fit(self.train_data)
+        model_data = model.get_model_data()[0]
+        expected_field_names = ['maxVector']
+        self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+        model_rows = [result for result in
+                      self.t_env.to_data_stream(model_data).execute_and_collect()]
+        self.assertEqual(1, len(model_rows))
+        self.assertListAlmostEqual([200.0, 400.0],
+                                   model_rows[0][expected_field_names.index('maxVector')])
+
+    def test_set_model_data(self):
+        max_abs_scaler = MaxAbsScaler()
+        model_a = max_abs_scaler.fit(self.train_data)
+        model_data = model_a.get_model_data()[0]
+
+        model_b = MaxAbsScalerModel().set_model_data(model_data)
+        update_existing_params(model_b, model_a)
+
+        output = model_b.transform(self.predict_data)[0]
+        self.verify_output_result(
+            output,
+            max_abs_scaler.get_output_col(),
+            output.get_schema().get_field_names(),
+            self.expected_data)
+
+    def test_save_load_and_predict(self):
+        max_abs_scaler = MaxAbsScaler()
+        reloaded_max_abs_scaler = self.save_and_reload(max_abs_scaler)
+        model = reloaded_max_abs_scaler.fit(self.train_data)
+        reloaded_model = self.save_and_reload(model)
+        output = reloaded_model.transform(self.predict_data)[0]
+        self.verify_output_result(
+            output,
+            max_abs_scaler.get_output_col(),
             output.get_schema().get_field_names(),
             self.expected_data)
 
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_minmaxscaler.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_minmaxscaler.py
index 230ce7b..3a699ac 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_minmaxscaler.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_minmaxscaler.py
@@ -21,8 +21,8 @@ from pyflink.common import Types
 from pyflink.table import Table
 
 from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, DenseVector
-from pyflink.ml.lib.feature.minmaxscaler import MinMaxScaler
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.lib.feature.minmaxscaler import MinMaxScaler, MinMaxScalerModel
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
 
 
 class MinMaxScalerTest(PyFlinkMLTestCase):
@@ -55,28 +55,28 @@ class MinMaxScalerTest(PyFlinkMLTestCase):
             Vectors.dense(0.75, 0.225)]
 
     def test_param(self):
-        min_max_scalar = MinMaxScaler()
-        self.assertEqual("input", min_max_scalar.input_col)
-        self.assertEqual("output", min_max_scalar.output_col)
-        self.assertEqual(0.0, min_max_scalar.min)
-        self.assertEqual(1.0, min_max_scalar.max)
-        min_max_scalar.set_input_col('test_input') \
+        min_max_scaler = MinMaxScaler()
+        self.assertEqual("input", min_max_scaler.input_col)
+        self.assertEqual("output", min_max_scaler.output_col)
+        self.assertEqual(0.0, min_max_scaler.min)
+        self.assertEqual(1.0, min_max_scaler.max)
+        min_max_scaler.set_input_col('test_input') \
             .set_output_col('test_output') \
             .set_min(1.0) \
             .set_max(4.0)
-        self.assertEqual('test_input', min_max_scalar.input_col)
-        self.assertEqual(1.0, min_max_scalar.min)
-        self.assertEqual(4.0, min_max_scalar.max)
-        self.assertEqual('test_output', min_max_scalar.output_col)
+        self.assertEqual('test_input', min_max_scaler.input_col)
+        self.assertEqual(1.0, min_max_scaler.min)
+        self.assertEqual(4.0, min_max_scaler.max)
+        self.assertEqual('test_output', min_max_scaler.output_col)
 
     def test_output_schema(self):
-        min_max_scalar = MinMaxScaler() \
+        min_max_scaler = MinMaxScaler() \
             .set_input_col('test_input') \
             .set_output_col('test_output') \
             .set_min(1.0) \
             .set_max(4.0)
 
-        model = min_max_scalar.fit(self.train_data.alias('test_input'))
+        model = min_max_scaler.fit(self.train_data.alias('test_input'))
         output = model.transform(self.predict_data.alias('test_input'))[0]
         self.assertEqual(
             ['test_input', 'test_output'],
@@ -99,25 +99,67 @@ class MinMaxScalerTest(PyFlinkMLTestCase):
                     ['input'],
                     [DenseVectorTypeInfo()])))
 
-        min_max_scalar = MinMaxScaler() \
+        min_max_scaler = MinMaxScaler() \
             .set_min(0.0) \
             .set_max(10.0)
 
-        model = min_max_scalar.fit(train_data)
+        model = min_max_scaler.fit(train_data)
         result = model.transform(predict_data)[0]
         self.verify_output_result(
             result,
-            min_max_scalar.get_output_col(),
+            min_max_scaler.get_output_col(),
             result.get_schema().get_field_names(),
             [Vectors.dense(5.0, 5.0)])
 
     def test_fit_and_predict(self):
-        min_max_scalar = MinMaxScaler()
-        model = min_max_scalar.fit(self.train_data)
+        min_max_scaler = MinMaxScaler()
+        model = min_max_scaler.fit(self.train_data)
         output = model.transform(self.predict_data)[0]
         self.verify_output_result(
             output,
-            min_max_scalar.get_output_col(),
+            min_max_scaler.get_output_col(),
+            output.get_schema().get_field_names(),
+            self.expected_data)
+
+    def test_get_model_data(self):
+        min_max_scaler = MinMaxScaler()
+        model = min_max_scaler.fit(self.train_data)
+        model_data = model.get_model_data()[0]
+        expected_field_names = ['minVector', 'maxVector']
+        self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+        model_rows = [result for result in
+                      self.t_env.to_data_stream(model_data).execute_and_collect()]
+        self.assertEqual(1, len(model_rows))
+        self.assertListAlmostEqual(
+            [0.0, 0.0], model_rows[0][expected_field_names.index('minVector')])
+        self.assertListAlmostEqual(
+            [200.0, 400.0], model_rows[0][expected_field_names.index('maxVector')])
+
+    def test_set_model_data(self):
+        min_max_scaler = MinMaxScaler()
+        model_a = min_max_scaler.fit(self.train_data)
+        model_data = model_a.get_model_data()[0]
+
+        model_b = MinMaxScalerModel().set_model_data(model_data)
+        update_existing_params(model_b, model_a)
+
+        output = model_b.transform(self.predict_data)[0]
+        self.verify_output_result(
+            output,
+            min_max_scaler.get_output_col(),
+            output.get_schema().get_field_names(),
+            self.expected_data)
+
+    def test_save_load_and_predict(self):
+        min_max_scaler = MinMaxScaler()
+        reloaded_min_max_scaler = self.save_and_reload(min_max_scaler)
+        model = reloaded_min_max_scaler.fit(self.train_data)
+        reloaded_model = self.save_and_reload(model)
+        output = reloaded_model.transform(self.predict_data)[0]
+        self.verify_output_result(
+            output,
+            min_max_scaler.get_output_col(),
             output.get_schema().get_field_names(),
             self.expected_data)
 
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_onehotencoder.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_onehotencoder.py
index 795200b..fe26e30 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_onehotencoder.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_onehotencoder.py
@@ -23,7 +23,7 @@ from pyflink.table import Table
 
 from pyflink.ml.core.linalg import Vectors, SparseVector
 from pyflink.ml.lib.feature.onehotencoder import OneHotEncoder, OneHotEncoderModel
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
 
 
 class OneHotEncoderTest(PyFlinkMLTestCase):
@@ -106,6 +106,45 @@ class OneHotEncoderTest(PyFlinkMLTestCase):
             output_table.get_schema().get_field_names(),
             expected_data)
 
+    def test_get_model_data(self):
+        model = self.estimator.fit(self.train_data)
+        model_data = model.get_model_data()[0]
+        expected_field_names = ['f0', 'f1']
+        self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+        model_rows = [result for result in
+                      self.t_env.to_data_stream(model_data).execute_and_collect()]
+        self.assertEqual(1, len(model_rows))
+        self.assertEqual(0, model_rows[0][expected_field_names.index('f0')])
+        self.assertEqual(2, model_rows[0][expected_field_names.index('f1')])
+
+    def test_set_model_data(self):
+        model_a = self.estimator.fit(self.train_data)
+        model_data = model_a.get_model_data()[0]
+
+        model_b = OneHotEncoderModel().set_model_data(model_data)
+        update_existing_params(model_b, model_a)
+
+        output = model_b.transform(self.predict_data)[0]
+        self.verify_output_result(
+            output,
+            model_b.input_cols,
+            model_b.output_cols,
+            output.get_schema().get_field_names(),
+            self.expected_data)
+
+    def test_save_load_and_predict(self):
+        reloaded_estimator = self.save_and_reload(self.estimator)
+        model = reloaded_estimator.fit(self.train_data)  # type: OneHotEncoderModel
+        reloaded_model = self.save_and_reload(model)
+        output_table = reloaded_model.transform(self.predict_data)[0]
+        self.verify_output_result(
+            output_table,
+            model.input_cols,
+            model.output_cols,
+            output_table.get_schema().get_field_names(),
+            self.expected_data)
+
     def verify_output_result(
             self,
             output: Table,
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_robustscaler.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_robustscaler.py
index d423aee..5c51aac 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_robustscaler.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_robustscaler.py
@@ -18,11 +18,11 @@
 from typing import List
 
 from pyflink.common import Types
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
 
 from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, DenseVector
 
-from pyflink.ml.lib.feature.robustscaler import RobustScaler
+from pyflink.ml.lib.feature.robustscaler import RobustScaler, RobustScalerModel
 from pyflink.table import Table
 
 
@@ -108,6 +108,36 @@ class RobustScalerTest(PyFlinkMLTestCase):
             output.get_schema().get_field_names(),
             self.expected_output)
 
+    def test_get_model_data(self):
+        robust_scaler = RobustScaler()
+        model = robust_scaler.fit(self.train_table)
+        model_data = model.get_model_data()[0]
+        expected_field_names = ['medians', 'ranges']
+        self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+        model_rows = [result for result in
+                      self.t_env.to_data_stream(model_data).execute_and_collect()]
+        self.assertEqual(1, len(model_rows))
+        self.assertListAlmostEqual(
+            [4.0, -4.0], model_rows[0][expected_field_names.index('medians')])
+        self.assertListAlmostEqual(
+            [4.0, 4.0], model_rows[0][expected_field_names.index('ranges')])
+
+    def test_set_model_data(self):
+        robust_scaler = RobustScaler()
+        model_a = robust_scaler.fit(self.train_table)
+        model_data = model_a.get_model_data()[0]
+
+        model_b = RobustScalerModel().set_model_data(model_data)
+        update_existing_params(model_b, model_a)
+
+        output = model_b.transform(self.predict_table)[0]
+        self.verify_output_result(
+            output,
+            robust_scaler.get_output_col(),
+            output.get_schema().get_field_names(),
+            self.expected_output)
+
     def test_save_load_predict(self):
         robust_scaler = RobustScaler()
         reloaded_robust_scaler = self.save_and_reload(robust_scaler)
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_standardscaler.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_standardscaler.py
index a4216af..469d839 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_standardscaler.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_standardscaler.py
@@ -21,8 +21,8 @@ from pyflink.table import Table
 from typing import List
 
 from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, DenseVector
-from pyflink.ml.lib.feature.standardscaler import StandardScaler
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.lib.feature.standardscaler import StandardScaler, StandardScalerModel
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
 
 
 class StandardScalerTest(PyFlinkMLTestCase):
@@ -113,6 +113,48 @@ class StandardScalerTest(PyFlinkMLTestCase):
             standard_scaler.get_output_col(),
             self.expected_res_with_mean_and_std)
 
+    def test_get_model_data(self):
+        standard_scaler = StandardScaler()
+        model = standard_scaler.fit(self.dense_input)
+        model_data = model.get_model_data()[0]
+        expected_field_names = ['mean', 'std']
+        self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+        model_rows = [result for result in
+                      self.t_env.to_data_stream(model_data).execute_and_collect()]
+        self.assertEqual(1, len(model_rows))
+        self.assertListAlmostEqual(
+            self.expected_mean, model_rows[0][expected_field_names.index('mean')])
+        self.assertListAlmostEqual(
+            self.expected_std, model_rows[0][expected_field_names.index('std')])
+
+    def test_set_model_data(self):
+        standard_scaler = StandardScaler()
+        model_a = standard_scaler.fit(self.dense_input)
+        model_data = model_a.get_model_data()[0]
+
+        model_b = StandardScalerModel().set_model_data(model_data)
+        update_existing_params(model_b, model_a)
+
+        output = model_b.transform(self.dense_input)[0]
+        self.verify_output_result(
+            output,
+            output.get_schema().get_field_names(),
+            standard_scaler.get_output_col(),
+            self.expected_res_with_std)
+
+    def test_save_load_and_predict(self):
+        standard_scaler = StandardScaler()
+        reloaded_standard_scaler = self.save_and_reload(standard_scaler)
+        model = reloaded_standard_scaler.fit(self.dense_input)
+        reloaded_model = self.save_and_reload(model)
+        output = reloaded_model.transform(self.dense_input)[0]
+        self.verify_output_result(
+            output,
+            output.get_schema().get_field_names(),
+            standard_scaler.get_output_col(),
+            self.expected_res_with_std)
+
     def verify_output_result(
             self,
             output: Table,
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_stringindexer.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_stringindexer.py
index 460da9b..0f6204b 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_stringindexer.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_stringindexer.py
@@ -18,8 +18,8 @@
 
 from pyflink.common import Types, Row
 
-from pyflink.ml.lib.feature.stringindexer import StringIndexer
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.lib.feature.stringindexer import StringIndexer, StringIndexerModel
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
 
 
 class StringIndexerTest(PyFlinkMLTestCase):
@@ -118,3 +118,53 @@ class StringIndexerTest(PyFlinkMLTestCase):
         predicted_results.sort(key=lambda x: x[0])
 
         self.assertEqual(predicted_results, self.expected_alphabetic_asc_predict_data)
+
+    def test_get_model_data(self):
+        string_indexer = StringIndexer() \
+            .set_input_cols('input_col1', 'input_col2') \
+            .set_output_cols('output_col1', 'output_col2') \
+            .set_string_order_type('alphabetAsc')
+        model = string_indexer.fit(self.train_table)
+        model_data = model.get_model_data()[0]
+        expected_field_names = ['stringArrays']
+        self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+        # TODO: Add test to collect and verify the model data results after FLINK-30122 is resolved.
+
+    def test_set_model_data(self):
+        string_indexer = StringIndexer() \
+            .set_input_cols('input_col1', 'input_col2') \
+            .set_output_cols('output_col1', 'output_col2') \
+            .set_string_order_type('alphabetAsc') \
+            .set_handle_invalid('keep')
+        model_a = string_indexer.fit(self.train_table)
+        model_data = model_a.get_model_data()[0]
+
+        model_b = StringIndexerModel().set_model_data(model_data)
+        update_existing_params(model_b, model_a)
+
+        output = model_b.transform(self.predict_table)[0]
+
+        predicted_results = [result for result in
+                             self.t_env.to_data_stream(output).execute_and_collect()]
+
+        predicted_results.sort(key=lambda x: x[0])
+
+        self.assertEqual(predicted_results, self.expected_alphabetic_asc_predict_data)
+
+    def test_save_load_and_predict(self):
+        string_indexer = StringIndexer() \
+            .set_input_cols('input_col1', 'input_col2') \
+            .set_output_cols('output_col1', 'output_col2') \
+            .set_string_order_type('alphabetAsc') \
+            .set_handle_invalid('keep')
+        reloaded_string_indexer = self.save_and_reload(string_indexer)
+
+        model = reloaded_string_indexer.fit(self.train_table)
+        reloaded_model = self.save_and_reload(model)
+
+        output = reloaded_model.transform(self.predict_table)[0]
+        predicted_results = [result for result in
+                             self.t_env.to_data_stream(output).execute_and_collect()]
+        predicted_results.sort(key=lambda x: x[0])
+        self.assertEqual(predicted_results, self.expected_alphabetic_asc_predict_data)
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_variancethresholdselector.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_variancethresholdselector.py
index b7a49a4..013b383 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_variancethresholdselector.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_variancethresholdselector.py
@@ -21,8 +21,9 @@ from pyflink.common import Types
 from pyflink.table import Table
 
 from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, DenseVector
-from pyflink.ml.lib.feature.variancethresholdselector import VarianceThresholdSelector
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.lib.feature.variancethresholdselector import \
+    VarianceThresholdSelector, VarianceThresholdSelectorModel
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
 
 
 class VarianceThresholdSelectorTest(PyFlinkMLTestCase):
@@ -112,6 +113,33 @@ class VarianceThresholdSelectorTest(PyFlinkMLTestCase):
                 output.get_schema().get_field_names(),
                 self.expected_output)
 
+    def test_get_model_data(self):
+        variance_threshold_selector = VarianceThresholdSelector() \
+            .set_variance_threshold(8.0)
+        model = variance_threshold_selector.fit(self.train_table)
+        model_data = model.get_model_data()[0]
+        expected_field_names = ['numOfFeatures', 'indices']
+        self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+        # TODO: Add test to collect and verify the model data results after Flink dependency
+        #  is upgraded to 1.15.3, 1.16.0 or a higher version. Related ticket: FLINK-29477
+
+    def test_set_model_data(self):
+        variance_threshold_selector = VarianceThresholdSelector() \
+            .set_variance_threshold(8.0)
+        model_a = variance_threshold_selector.fit(self.train_table)
+        model_data = model_a.get_model_data()[0]
+
+        model_b = VarianceThresholdSelectorModel().set_model_data(model_data)
+        update_existing_params(model_b, model_a)
+
+        output = model_b.transform(self.predict_table)[0]
+        self.verify_output_result(
+            output,
+            variance_threshold_selector.get_output_col(),
+            output.get_schema().get_field_names(),
+            self.expected_output)
+
     def test_save_load_predict(self):
         variance_threshold_selector = VarianceThresholdSelector() \
             .set_variance_threshold(8.0)
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_vectorindexer.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_vectorindexer.py
index 8e63427..9ca499b 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_vectorindexer.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_vectorindexer.py
@@ -22,7 +22,7 @@ from pyflink.common import Types
 
 from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
 from pyflink.ml.lib.feature.vectorindexer import VectorIndexer, VectorIndexerModel
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
 
 
 class VectorIndexerTest(PyFlinkMLTestCase):
@@ -101,3 +101,28 @@ class VectorIndexerTest(PyFlinkMLTestCase):
         predicted_results.sort(key=lambda x: x[1])
         self.expected_output.sort(key=lambda x: x[1])
         self.assertEqual(self.expected_output, predicted_results)
+
+    def test_get_model_data(self):
+        vector_indexer = VectorIndexer().set_handle_invalid('keep')
+        model = vector_indexer.fit(self.train_table)
+        model_data = model.get_model_data()[0]
+        expected_field_names = ['categoryMaps']
+        self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+        # TODO: Add test to collect and verify the model data results after FLINK-30124 is resolved.
+
+    def test_set_model_data(self):
+        vector_indexer = VectorIndexer().set_handle_invalid('keep')
+        model_a = vector_indexer.fit(self.train_table)
+        model_data = model_a.get_model_data()[0]
+
+        model_b = VectorIndexerModel().set_model_data(model_data)
+        update_existing_params(model_b, model_a)
+
+        output = model_b.transform(self.predict_table)[0]
+        predicted_results = [result[1] for result in
+                             self.t_env.to_data_stream(output).execute_and_collect()]
+
+        predicted_results.sort(key=lambda x: x[1])
+        self.expected_output.sort(key=lambda x: x[1])
+        self.assertEqual(self.expected_output, predicted_results)
diff --git a/flink-ml-python/pyflink/ml/tests/test_utils.py b/flink-ml-python/pyflink/ml/tests/test_utils.py
index 04104c4..4b376dc 100644
--- a/flink-ml-python/pyflink/ml/tests/test_utils.py
+++ b/flink-ml-python/pyflink/ml/tests/test_utils.py
@@ -24,9 +24,17 @@ import uuid
 
 from pyflink.common import RestartStrategies, Configuration
 from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.java_gateway import get_gateway
 from pyflink.table import StreamTableEnvironment
 from pyflink.util.java_utils import get_j_env_configuration
 
+from pyflink.ml.core.wrapper import JavaWithParams
+
+
+def update_existing_params(target: JavaWithParams, source: JavaWithParams):
+    get_gateway().jvm.org.apache.flink.ml.util.ReadWriteUtils \
+        .updateExistingParams(target._java_obj, source._java_obj.getParamMap())
+
 
 class PyFlinkMLTestCase(unittest.TestCase):
     def setUp(self):
@@ -84,3 +92,10 @@ class PyFlinkMLTestCase(unittest.TestCase):
                 raise e
         load_func = getattr(stage, 'load')
         return load_func(self.t_env, path)
+
+    def assertListAlmostEqual(self, expected_list, actual_list, places=None, msg=None,
+                              delta=None):
+        self.assertEqual(len(expected_list), len(actual_list))
+        for i in range(len(expected_list)):
+            self.assertAlmostEqual(expected_list[i], actual_list[i],
+                                   places=places, msg=msg, delta=delta)