You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2022/11/22 04:12:15 UTC
[flink-ml] branch master updated: [FLINK-30099] Add test case for algorithms' python APIs
This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git
The following commit(s) were added to refs/heads/master by this push:
new 4939933 [FLINK-30099] Add test case for algorithms' python APIs
4939933 is described below
commit 4939933a00eff0a4dccf3b9d0fa06595cdfd06e6
Author: yunfengzhou-hub <yu...@outlook.com>
AuthorDate: Tue Nov 22 12:12:10 2022 +0800
[FLINK-30099] Add test case for algorithms' python APIs
This closes #178.
---
.../ml/lib/classification/tests/test_knn.py | 15 +++-
.../ml/lib/classification/tests/test_linearsvc.py | 49 ++++++++++++-
.../ml/lib/classification/tests/test_naivebayes.py | 21 +++++-
.../pyflink/ml/lib/clustering/tests/test_kmeans.py | 33 ++++++++-
.../pyflink/ml/lib/feature/tests/test_idf.py | 23 +++++-
.../pyflink/ml/lib/feature/tests/test_imputer.py | 30 +++++++-
.../lib/feature/tests/test_indextostringmodel.py | 28 ++++++++
.../ml/lib/feature/tests/test_kbinsdiscretizer.py | 22 +++++-
.../ml/lib/feature/tests/test_maxabsscaler.py | 66 +++++++++++++----
.../ml/lib/feature/tests/test_minmaxscaler.py | 82 ++++++++++++++++------
.../ml/lib/feature/tests/test_onehotencoder.py | 41 ++++++++++-
.../ml/lib/feature/tests/test_robustscaler.py | 34 ++++++++-
.../ml/lib/feature/tests/test_standardscaler.py | 46 +++++++++++-
.../ml/lib/feature/tests/test_stringindexer.py | 54 +++++++++++++-
.../tests/test_variancethresholdselector.py | 32 ++++++++-
.../ml/lib/feature/tests/test_vectorindexer.py | 27 ++++++-
flink-ml-python/pyflink/ml/tests/test_utils.py | 15 ++++
17 files changed, 566 insertions(+), 52 deletions(-)
diff --git a/flink-ml-python/pyflink/ml/lib/classification/tests/test_knn.py b/flink-ml-python/pyflink/ml/lib/classification/tests/test_knn.py
index 77ad3fa..d140678 100644
--- a/flink-ml-python/pyflink/ml/lib/classification/tests/test_knn.py
+++ b/flink-ml-python/pyflink/ml/lib/classification/tests/test_knn.py
@@ -21,7 +21,7 @@ from pyflink.common import Types
from pyflink.table import Table
from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, DenseMatrix, DenseVector
-from pyflink.ml.lib.classification.knn import KNN
+from pyflink.ml.lib.classification.knn import KNN, KNNModel
from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
@@ -157,6 +157,19 @@ class KNNTest(PyFlinkMLTestCase):
self.assertEqual(packed_features.num_cols(), labels.size())
self.assertEqual(feature_norm_squares.size(), labels.size())
+ def test_set_model_data(self):
+ knn = KNN()
+ model_a = knn.fit(self.train_data)
+ model_data = model_a.get_model_data()[0]
+
+ model_b = KNNModel().set_model_data(model_data)
+ output = model_b.transform(self.predict_data)[0]
+ field_names = output.get_schema().get_field_names()
+ self.verify_predict_result(
+ output,
+ field_names.index(knn.label_col),
+ field_names.index(knn.prediction_col))
+
def verify_predict_result(
self, output: Table, label_index, prediction_index):
with self.t_env.to_data_stream(output).execute_and_collect() as results:
diff --git a/flink-ml-python/pyflink/ml/lib/classification/tests/test_linearsvc.py b/flink-ml-python/pyflink/ml/lib/classification/tests/test_linearsvc.py
index ada27e7..2cb6a4b 100644
--- a/flink-ml-python/pyflink/ml/lib/classification/tests/test_linearsvc.py
+++ b/flink-ml-python/pyflink/ml/lib/classification/tests/test_linearsvc.py
@@ -20,8 +20,8 @@ from pyflink.common import Types
from pyflink.table import Table
from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
-from pyflink.ml.lib.classification.linearsvc import LinearSVC
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.lib.classification.linearsvc import LinearSVC, LinearSVCModel
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
class LinearSVCTest(PyFlinkMLTestCase):
@@ -44,6 +44,8 @@ class LinearSVCTest(PyFlinkMLTestCase):
['features', 'label', 'weight'],
[DenseVectorTypeInfo(), Types.DOUBLE(), Types.DOUBLE()])))
+ self.eps = 0.1
+
def test_param(self):
linear_svc = LinearSVC()
self.assertEqual('features', linear_svc.features_col)
@@ -113,6 +115,49 @@ class LinearSVCTest(PyFlinkMLTestCase):
linear_svc.prediction_col,
linear_svc.raw_prediction_col)
+ def test_get_model_data(self):
+ linear_svc = LinearSVC().set_weight_col('weight')
+ model_data = linear_svc.fit(self.train_data).get_model_data()[0]
+ expected_field_names = ['coefficient']
+ self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+ model_rows = [result for result in
+ self.t_env.to_data_stream(model_data).execute_and_collect()]
+ self.assertEqual(1, len(model_rows))
+ self.assertListAlmostEqual(
+ [0.470, -0.273, -0.410, -0.546],
+ model_rows[expected_field_names.index('coefficient')][0].to_array(),
+ delta=self.eps)
+
+ def test_set_model_data(self):
+ linear_svc = LinearSVC().set_weight_col('weight')
+ model_a = linear_svc.fit(self.train_data)
+ model_data = model_a.get_model_data()[0]
+
+ model_b = LinearSVCModel().set_model_data(model_data)
+ update_existing_params(model_b, model_a)
+
+ output = model_b.transform(self.train_data)[0]
+ self.verify_prediction_result(
+ output,
+ output.get_schema().get_field_names(),
+ linear_svc.features_col,
+ linear_svc.prediction_col,
+ linear_svc.raw_prediction_col)
+
+ def test_save_load_and_predict(self):
+ linear_svc = LinearSVC().set_weight_col('weight')
+ reloaded_linear_svc = self.save_and_reload(linear_svc)
+ model = reloaded_linear_svc.fit(self.train_data)
+ reloaded_model = self.save_and_reload(model)
+ output = reloaded_model.transform(self.train_data)[0]
+ self.verify_prediction_result(
+ output,
+ output.get_schema().get_field_names(),
+ linear_svc.features_col,
+ linear_svc.prediction_col,
+ linear_svc.raw_prediction_col)
+
def verify_prediction_result(self,
output: Table,
field_names,
diff --git a/flink-ml-python/pyflink/ml/lib/classification/tests/test_naivebayes.py b/flink-ml-python/pyflink/ml/lib/classification/tests/test_naivebayes.py
index 35fc816..16f0c8c 100644
--- a/flink-ml-python/pyflink/ml/lib/classification/tests/test_naivebayes.py
+++ b/flink-ml-python/pyflink/ml/lib/classification/tests/test_naivebayes.py
@@ -22,7 +22,7 @@ from pyflink.table import Table
from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
from pyflink.ml.lib.classification.naivebayes import NaiveBayes, NaiveBayesModel
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
class NaiveBayesTest(PyFlinkMLTestCase):
@@ -112,6 +112,25 @@ class NaiveBayesTest(PyFlinkMLTestCase):
actual_output = self.execute_and_collect(output_table)
self.assertEqual(self.expected_output, actual_output)
+ def test_get_model_data(self):
+ model = self.estimator.fit(self.train_data)
+ model_data = model.get_model_data()[0]
+ expected_field_names = ['theta', 'piArray', 'labels']
+ self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+ # TODO: Add test to collect and verify the model data results after FLINK-30124 is resolved.
+
+ def test_set_model_data(self):
+ model_a = self.estimator.fit(self.train_data)
+ model_data = model_a.get_model_data()[0]
+
+ model_b = NaiveBayesModel().set_model_data(model_data)
+ update_existing_params(model_b, model_a)
+
+ output_table = model_b.transform(self.predict_data)[0]
+ actual_output = self.execute_and_collect(output_table)
+ self.assertEqual(self.expected_output, actual_output)
+
def execute_and_collect(self, output: Table):
res = {}
with self.t_env.to_data_stream(output).execute_and_collect() as results:
diff --git a/flink-ml-python/pyflink/ml/lib/clustering/tests/test_kmeans.py b/flink-ml-python/pyflink/ml/lib/clustering/tests/test_kmeans.py
index f4738c5..c63b26b 100644
--- a/flink-ml-python/pyflink/ml/lib/clustering/tests/test_kmeans.py
+++ b/flink-ml-python/pyflink/ml/lib/clustering/tests/test_kmeans.py
@@ -22,7 +22,7 @@ from typing import List, Dict, Set
from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, DenseVector
from pyflink.ml.lib.clustering.kmeans import KMeans, KMeansModel, OnlineKMeans
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
def group_features_by_prediction(
@@ -146,6 +146,37 @@ class KMeansTest(PyFlinkMLTestCase):
actual_groups[0] == self.expected_groups[1]
and actual_groups[1] == self.expected_groups[0])
+ def test_get_model_data(self):
+ kmeans = KMeans().set_max_iter(2).set_k(2)
+ model = kmeans.fit(self.data_table)
+ model_data = model.get_model_data()[0]
+ expected_field_names = ['centroids', 'weights']
+ self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+ # TODO: Add test to collect and verify the model data results after FLINK-30122 is resolved.
+
+ def test_set_model_data(self):
+ kmeans = KMeans().set_max_iter(2).set_k(2)
+ model_a = kmeans.fit(self.data_table)
+ model_data = model_a.get_model_data()[0]
+
+ model_b = KMeansModel().set_model_data(model_data)
+ update_existing_params(model_b, model_a)
+
+ output = model_b.transform(self.data_table)[0]
+ self.assertEqual(['features', 'prediction'], output.get_schema().get_field_names())
+ results = [result for result in self.t_env.to_data_stream(output).execute_and_collect()]
+ field_names = output.get_schema().get_field_names()
+ actual_groups = group_features_by_prediction(
+ results,
+ field_names.index(kmeans.features_col),
+ field_names.index(kmeans.prediction_col))
+
+ self.assertTrue(actual_groups[0] == self.expected_groups[0]
+ and actual_groups[1] == self.expected_groups[1] or
+ actual_groups[0] == self.expected_groups[1]
+ and actual_groups[1] == self.expected_groups[0])
+
def test_save_load_and_predict(self):
kmeans = KMeans().set_max_iter(2).set_k(2)
model = kmeans.fit(self.data_table)
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_idf.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_idf.py
index d192ef3..11f1887 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_idf.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_idf.py
@@ -22,7 +22,7 @@ from pyflink.common import Types
from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
from pyflink.ml.lib.feature.idf import IDF, IDFModel
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
class IDFTest(PyFlinkMLTestCase):
@@ -112,6 +112,27 @@ class IDFTest(PyFlinkMLTestCase):
output = idf.fit(self.input_data).transform(self.input_data)[0]
self.verify_prediction_result(self.expected_output_min_doc_freq_as_two, output)
+ def test_get_model_data(self):
+ idf = IDF()
+ model = idf.fit(self.input_data)
+ model_data = model.get_model_data()[0]
+ expected_field_names = ['idf', 'docFreq', 'numDocs']
+ self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+ # TODO: Add test to collect and verify the model data results after Flink dependency
+ # is upgraded to 1.15.3, 1.16.0 or a higher version. Related ticket: FLINK-29477
+
+ def test_set_model_data(self):
+ idf = IDF()
+ model_a = idf.fit(self.input_data)
+ model_data = model_a.get_model_data()[0]
+
+ model_b = IDFModel().set_model_data(model_data)
+ update_existing_params(model_b, model_a)
+
+ output = model_b.transform(self.input_data)[0]
+ self.verify_prediction_result(self.expected_output, output)
+
def test_save_load_predict(self):
idf = IDF()
estimator_path = os.path.join(self.temp_dir, 'test_save_load_predict_idf')
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_imputer.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_imputer.py
index 3addeee..8f045ac 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_imputer.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_imputer.py
@@ -20,8 +20,8 @@ from typing import List
import numpy as np
from pyflink.table import Table
from pyflink.common import Types, Row
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
-from pyflink.ml.lib.feature.imputer import Imputer
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
+from pyflink.ml.lib.feature.imputer import Imputer, ImputerModel
class ImputerTest(PyFlinkMLTestCase):
@@ -107,6 +107,32 @@ class ImputerTest(PyFlinkMLTestCase):
self.verify_output_result(
output, imputer.get_output_cols(), field_names, expected_output)
+ def test_get_model_data(self):
+ imputer = Imputer().\
+ set_input_cols('f1', 'f2', 'f3').\
+ set_output_cols('o1', 'o2', 'o3')
+ model = imputer.fit(self.train_table)
+ model_data = model.get_model_data()[0]
+ expected_field_names = ['surrogates']
+ self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+ # TODO: Add test to collect and verify the model data results after FLINK-30124 is resolved.
+
+ def test_set_model_data(self):
+ imputer = Imputer().\
+ set_input_cols('f1', 'f2', 'f3').\
+ set_output_cols('o1', 'o2', 'o3')
+ model_a = imputer.fit(self.train_table)
+ model_data = model_a.get_model_data()[0]
+
+ model_b = ImputerModel().set_model_data(model_data)
+ update_existing_params(model_b, model_a)
+
+ output = model_b.transform(self.train_table)[0]
+ field_names = output.get_schema().get_field_names()
+ self.verify_output_result(
+ output, imputer.get_output_cols(), field_names, self.expected_mean_strategy_output)
+
def test_save_load_predict(self):
imputer = Imputer(). \
set_input_cols('f1', 'f2', 'f3'). \
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_indextostringmodel.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_indextostringmodel.py
index 42df909..41ef7bd 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_indextostringmodel.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_indextostringmodel.py
@@ -75,3 +75,31 @@ class IndexToStringModelTest(PyFlinkMLTestCase):
predicted_results.sort(key=lambda x: x[0])
self.assertEqual(predicted_results, self.expected_prediction)
+
+ def test_get_model_data(self):
+ model = IndexToStringModel() \
+ .set_input_cols('input_col1', 'input_col2') \
+ .set_output_cols('output_col1', 'output_col2') \
+ .set_model_data(self.model_data_table)
+ model_data = model.get_model_data()[0]
+ expected_field_names = ['stringArrays']
+ self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+ # TODO: Add test to collect and verify the model data results after FLINK-30122 is resolved.
+
+ def test_save_load_and_predict(self):
+ model = IndexToStringModel() \
+ .set_input_cols('input_col1', 'input_col2') \
+ .set_output_cols('output_col1', 'output_col2') \
+ .set_model_data(self.model_data_table)
+
+ reloaded_model = self.save_and_reload(model)
+
+ output = reloaded_model.transform(self.predict_table)[0]
+
+ predicted_results = [result for result in
+ self.t_env.to_data_stream(output).execute_and_collect()]
+
+ predicted_results.sort(key=lambda x: x[0])
+
+ self.assertEqual(predicted_results, self.expected_prediction)
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_kbinsdiscretizer.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_kbinsdiscretizer.py
index 2dacb68..70bc9a5 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_kbinsdiscretizer.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_kbinsdiscretizer.py
@@ -22,7 +22,7 @@ from pyflink.common import Types
from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
from pyflink.ml.lib.feature.kbinsdiscretizer import KBinsDiscretizer, KBinsDiscretizerModel
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
class KBinsDiscretizerTest(PyFlinkMLTestCase):
@@ -156,6 +156,26 @@ class KBinsDiscretizerTest(PyFlinkMLTestCase):
output = k_bins_discretizer.fit(self.train_table).transform(self.predict_table)[0]
self.verify_prediction_result(self.kmeans_output, output)
+ def test_get_model_data(self):
+ k_bins_discretizer = KBinsDiscretizer().set_num_bins(3).set_strategy('uniform')
+ model = k_bins_discretizer.fit(self.train_table)
+ model_data = model.get_model_data()[0]
+ expected_field_names = ['binEdges']
+ self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+ # TODO: Add test to collect and verify the model data results after FLINK-30122 is resolved.
+
+ def test_set_model_data(self):
+ k_bins_discretizer = KBinsDiscretizer().set_num_bins(3).set_strategy('uniform')
+ model_a = k_bins_discretizer.fit(self.train_table)
+ model_data = model_a.get_model_data()[0]
+
+ model_b = KBinsDiscretizerModel().set_model_data(model_data)
+ update_existing_params(model_b, model_a)
+
+ output = model_b.transform(self.predict_table)[0]
+ self.verify_prediction_result(self.uniform_output, output)
+
def test_save_load_predict(self):
k_bins_discretizer = KBinsDiscretizer().set_num_bins(3)
estimator_path = os.path.join(self.temp_dir, 'test_save_load_predict_kbinsdiscretizer')
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_maxabsscaler.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_maxabsscaler.py
index 925ae4a..97d974d 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_maxabsscaler.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_maxabsscaler.py
@@ -21,8 +21,8 @@ from pyflink.common import Types
from pyflink.table import Table
from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, DenseVector
-from pyflink.ml.lib.feature.maxabsscaler import MaxAbsScaler
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.lib.feature.maxabsscaler import MaxAbsScaler, MaxAbsScalerModel
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
class MaxAbsScalerTest(PyFlinkMLTestCase):
@@ -55,32 +55,72 @@ class MaxAbsScalerTest(PyFlinkMLTestCase):
Vectors.dense(0.75, 0.225)]
def test_param(self):
- max_abs_scalar = MaxAbsScaler()
- self.assertEqual("input", max_abs_scalar.input_col)
- self.assertEqual("output", max_abs_scalar.output_col)
- max_abs_scalar.set_input_col('test_input') \
+ max_abs_scaler = MaxAbsScaler()
+ self.assertEqual("input", max_abs_scaler.input_col)
+ self.assertEqual("output", max_abs_scaler.output_col)
+ max_abs_scaler.set_input_col('test_input') \
.set_output_col('test_output')
- self.assertEqual('test_input', max_abs_scalar.input_col)
- self.assertEqual('test_output', max_abs_scalar.output_col)
+ self.assertEqual('test_input', max_abs_scaler.input_col)
+ self.assertEqual('test_output', max_abs_scaler.output_col)
def test_output_schema(self):
- max_abs_scalar = MaxAbsScaler() \
+ max_abs_scaler = MaxAbsScaler() \
.set_input_col('test_input') \
.set_output_col('test_output')
- model = max_abs_scalar.fit(self.train_data.alias('test_input'))
+ model = max_abs_scaler.fit(self.train_data.alias('test_input'))
output = model.transform(self.predict_data.alias('test_input'))[0]
self.assertEqual(
['test_input', 'test_output'],
output.get_schema().get_field_names())
def test_fit_and_predict(self):
- max_abs_scalar = MaxAbsScaler()
- model = max_abs_scalar.fit(self.train_data)
+ max_abs_scaler = MaxAbsScaler()
+ model = max_abs_scaler.fit(self.train_data)
output = model.transform(self.predict_data)[0]
self.verify_output_result(
output,
- max_abs_scalar.get_output_col(),
+ max_abs_scaler.get_output_col(),
+ output.get_schema().get_field_names(),
+ self.expected_data)
+
+ def test_get_model_data(self):
+ max_abs_scaler = MaxAbsScaler()
+ model = max_abs_scaler.fit(self.train_data)
+ model_data = model.get_model_data()[0]
+ expected_field_names = ['maxVector']
+ self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+ model_rows = [result for result in
+ self.t_env.to_data_stream(model_data).execute_and_collect()]
+ self.assertEqual(1, len(model_rows))
+ self.assertListAlmostEqual([200.0, 400.0],
+ model_rows[0][expected_field_names.index('maxVector')])
+
+ def test_set_model_data(self):
+ max_abs_scaler = MaxAbsScaler()
+ model_a = max_abs_scaler.fit(self.train_data)
+ model_data = model_a.get_model_data()[0]
+
+ model_b = MaxAbsScalerModel().set_model_data(model_data)
+ update_existing_params(model_b, model_a)
+
+ output = model_b.transform(self.predict_data)[0]
+ self.verify_output_result(
+ output,
+ max_abs_scaler.get_output_col(),
+ output.get_schema().get_field_names(),
+ self.expected_data)
+
+ def test_save_load_and_predict(self):
+ max_abs_scaler = MaxAbsScaler()
+ reloaded_max_abs_scaler = self.save_and_reload(max_abs_scaler)
+ model = reloaded_max_abs_scaler.fit(self.train_data)
+ reloaded_model = self.save_and_reload(model)
+ output = reloaded_model.transform(self.predict_data)[0]
+ self.verify_output_result(
+ output,
+ max_abs_scaler.get_output_col(),
output.get_schema().get_field_names(),
self.expected_data)
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_minmaxscaler.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_minmaxscaler.py
index 230ce7b..3a699ac 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_minmaxscaler.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_minmaxscaler.py
@@ -21,8 +21,8 @@ from pyflink.common import Types
from pyflink.table import Table
from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, DenseVector
-from pyflink.ml.lib.feature.minmaxscaler import MinMaxScaler
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.lib.feature.minmaxscaler import MinMaxScaler, MinMaxScalerModel
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
class MinMaxScalerTest(PyFlinkMLTestCase):
@@ -55,28 +55,28 @@ class MinMaxScalerTest(PyFlinkMLTestCase):
Vectors.dense(0.75, 0.225)]
def test_param(self):
- min_max_scalar = MinMaxScaler()
- self.assertEqual("input", min_max_scalar.input_col)
- self.assertEqual("output", min_max_scalar.output_col)
- self.assertEqual(0.0, min_max_scalar.min)
- self.assertEqual(1.0, min_max_scalar.max)
- min_max_scalar.set_input_col('test_input') \
+ min_max_scaler = MinMaxScaler()
+ self.assertEqual("input", min_max_scaler.input_col)
+ self.assertEqual("output", min_max_scaler.output_col)
+ self.assertEqual(0.0, min_max_scaler.min)
+ self.assertEqual(1.0, min_max_scaler.max)
+ min_max_scaler.set_input_col('test_input') \
.set_output_col('test_output') \
.set_min(1.0) \
.set_max(4.0)
- self.assertEqual('test_input', min_max_scalar.input_col)
- self.assertEqual(1.0, min_max_scalar.min)
- self.assertEqual(4.0, min_max_scalar.max)
- self.assertEqual('test_output', min_max_scalar.output_col)
+ self.assertEqual('test_input', min_max_scaler.input_col)
+ self.assertEqual(1.0, min_max_scaler.min)
+ self.assertEqual(4.0, min_max_scaler.max)
+ self.assertEqual('test_output', min_max_scaler.output_col)
def test_output_schema(self):
- min_max_scalar = MinMaxScaler() \
+ min_max_scaler = MinMaxScaler() \
.set_input_col('test_input') \
.set_output_col('test_output') \
.set_min(1.0) \
.set_max(4.0)
- model = min_max_scalar.fit(self.train_data.alias('test_input'))
+ model = min_max_scaler.fit(self.train_data.alias('test_input'))
output = model.transform(self.predict_data.alias('test_input'))[0]
self.assertEqual(
['test_input', 'test_output'],
@@ -99,25 +99,67 @@ class MinMaxScalerTest(PyFlinkMLTestCase):
['input'],
[DenseVectorTypeInfo()])))
- min_max_scalar = MinMaxScaler() \
+ min_max_scaler = MinMaxScaler() \
.set_min(0.0) \
.set_max(10.0)
- model = min_max_scalar.fit(train_data)
+ model = min_max_scaler.fit(train_data)
result = model.transform(predict_data)[0]
self.verify_output_result(
result,
- min_max_scalar.get_output_col(),
+ min_max_scaler.get_output_col(),
result.get_schema().get_field_names(),
[Vectors.dense(5.0, 5.0)])
def test_fit_and_predict(self):
- min_max_scalar = MinMaxScaler()
- model = min_max_scalar.fit(self.train_data)
+ min_max_scaler = MinMaxScaler()
+ model = min_max_scaler.fit(self.train_data)
output = model.transform(self.predict_data)[0]
self.verify_output_result(
output,
- min_max_scalar.get_output_col(),
+ min_max_scaler.get_output_col(),
+ output.get_schema().get_field_names(),
+ self.expected_data)
+
+ def test_get_model_data(self):
+ min_max_scaler = MinMaxScaler()
+ model = min_max_scaler.fit(self.train_data)
+ model_data = model.get_model_data()[0]
+ expected_field_names = ['minVector', 'maxVector']
+ self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+ model_rows = [result for result in
+ self.t_env.to_data_stream(model_data).execute_and_collect()]
+ self.assertEqual(1, len(model_rows))
+ self.assertListAlmostEqual(
+ [0.0, 0.0], model_rows[0][expected_field_names.index('minVector')])
+ self.assertListAlmostEqual(
+ [200.0, 400.0], model_rows[0][expected_field_names.index('maxVector')])
+
+ def test_set_model_data(self):
+ min_max_scaler = MinMaxScaler()
+ model_a = min_max_scaler.fit(self.train_data)
+ model_data = model_a.get_model_data()[0]
+
+ model_b = MinMaxScalerModel().set_model_data(model_data)
+ update_existing_params(model_b, model_a)
+
+ output = model_b.transform(self.predict_data)[0]
+ self.verify_output_result(
+ output,
+ min_max_scaler.get_output_col(),
+ output.get_schema().get_field_names(),
+ self.expected_data)
+
+ def test_save_load_and_predict(self):
+ min_max_scaler = MinMaxScaler()
+ reloaded_min_max_scaler = self.save_and_reload(min_max_scaler)
+ model = reloaded_min_max_scaler.fit(self.train_data)
+ reloaded_model = self.save_and_reload(model)
+ output = reloaded_model.transform(self.predict_data)[0]
+ self.verify_output_result(
+ output,
+ min_max_scaler.get_output_col(),
output.get_schema().get_field_names(),
self.expected_data)
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_onehotencoder.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_onehotencoder.py
index 795200b..fe26e30 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_onehotencoder.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_onehotencoder.py
@@ -23,7 +23,7 @@ from pyflink.table import Table
from pyflink.ml.core.linalg import Vectors, SparseVector
from pyflink.ml.lib.feature.onehotencoder import OneHotEncoder, OneHotEncoderModel
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
class OneHotEncoderTest(PyFlinkMLTestCase):
@@ -106,6 +106,45 @@ class OneHotEncoderTest(PyFlinkMLTestCase):
output_table.get_schema().get_field_names(),
expected_data)
+ def test_get_model_data(self):
+ model = self.estimator.fit(self.train_data)
+ model_data = model.get_model_data()[0]
+ expected_field_names = ['f0', 'f1']
+ self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+ model_rows = [result for result in
+ self.t_env.to_data_stream(model_data).execute_and_collect()]
+ self.assertEqual(1, len(model_rows))
+ self.assertEqual(0, model_rows[0][expected_field_names.index('f0')])
+ self.assertEqual(2, model_rows[0][expected_field_names.index('f1')])
+
+ def test_set_model_data(self):
+ model_a = self.estimator.fit(self.train_data)
+ model_data = model_a.get_model_data()[0]
+
+ model_b = OneHotEncoderModel().set_model_data(model_data)
+ update_existing_params(model_b, model_a)
+
+ output = model_b.transform(self.predict_data)[0]
+ self.verify_output_result(
+ output,
+ model_b.input_cols,
+ model_b.output_cols,
+ output.get_schema().get_field_names(),
+ self.expected_data)
+
+ def test_save_load_and_predict(self):
+ reloaded_estimator = self.save_and_reload(self.estimator)
+ model = reloaded_estimator.fit(self.train_data) # type: OneHotEncoderModel
+ reloaded_model = self.save_and_reload(model)
+ output_table = reloaded_model.transform(self.predict_data)[0]
+ self.verify_output_result(
+ output_table,
+ model.input_cols,
+ model.output_cols,
+ output_table.get_schema().get_field_names(),
+ self.expected_data)
+
def verify_output_result(
self,
output: Table,
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_robustscaler.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_robustscaler.py
index d423aee..5c51aac 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_robustscaler.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_robustscaler.py
@@ -18,11 +18,11 @@
from typing import List
from pyflink.common import Types
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, DenseVector
-from pyflink.ml.lib.feature.robustscaler import RobustScaler
+from pyflink.ml.lib.feature.robustscaler import RobustScaler, RobustScalerModel
from pyflink.table import Table
@@ -108,6 +108,36 @@ class RobustScalerTest(PyFlinkMLTestCase):
output.get_schema().get_field_names(),
self.expected_output)
+ def test_get_model_data(self):
+ robust_scaler = RobustScaler()
+ model = robust_scaler.fit(self.train_table)
+ model_data = model.get_model_data()[0]
+ expected_field_names = ['medians', 'ranges']
+ self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+ model_rows = [result for result in
+ self.t_env.to_data_stream(model_data).execute_and_collect()]
+ self.assertEqual(1, len(model_rows))
+ self.assertListAlmostEqual(
+ [4.0, -4.0], model_rows[0][expected_field_names.index('medians')])
+ self.assertListAlmostEqual(
+ [4.0, 4.0], model_rows[0][expected_field_names.index('ranges')])
+
+ def test_set_model_data(self):
+ robust_scaler = RobustScaler()
+ model_a = robust_scaler.fit(self.train_table)
+ model_data = model_a.get_model_data()[0]
+
+ model_b = RobustScalerModel().set_model_data(model_data)
+ update_existing_params(model_b, model_a)
+
+ output = model_b.transform(self.predict_table)[0]
+ self.verify_output_result(
+ output,
+ robust_scaler.get_output_col(),
+ output.get_schema().get_field_names(),
+ self.expected_output)
+
def test_save_load_predict(self):
robust_scaler = RobustScaler()
reloaded_robust_scaler = self.save_and_reload(robust_scaler)
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_standardscaler.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_standardscaler.py
index a4216af..469d839 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_standardscaler.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_standardscaler.py
@@ -21,8 +21,8 @@ from pyflink.table import Table
from typing import List
from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, DenseVector
-from pyflink.ml.lib.feature.standardscaler import StandardScaler
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.lib.feature.standardscaler import StandardScaler, StandardScalerModel
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
class StandardScalerTest(PyFlinkMLTestCase):
@@ -113,6 +113,48 @@ class StandardScalerTest(PyFlinkMLTestCase):
standard_scaler.get_output_col(),
self.expected_res_with_mean_and_std)
+ def test_get_model_data(self):
+ standard_scaler = StandardScaler()
+ model = standard_scaler.fit(self.dense_input)
+ model_data = model.get_model_data()[0]
+ expected_field_names = ['mean', 'std']
+ self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+ model_rows = [result for result in
+ self.t_env.to_data_stream(model_data).execute_and_collect()]
+ self.assertEqual(1, len(model_rows))
+ self.assertListAlmostEqual(
+ self.expected_mean, model_rows[0][expected_field_names.index('mean')])
+ self.assertListAlmostEqual(
+ self.expected_std, model_rows[0][expected_field_names.index('std')])
+
+ def test_set_model_data(self):
+ standard_scaler = StandardScaler()
+ model_a = standard_scaler.fit(self.dense_input)
+ model_data = model_a.get_model_data()[0]
+
+ model_b = StandardScalerModel().set_model_data(model_data)
+ update_existing_params(model_b, model_a)
+
+ output = model_b.transform(self.dense_input)[0]
+ self.verify_output_result(
+ output,
+ output.get_schema().get_field_names(),
+ standard_scaler.get_output_col(),
+ self.expected_res_with_std)
+
+ def test_save_load_and_predict(self):
+ standard_scaler = StandardScaler()
+ reloaded_standard_scaler = self.save_and_reload(standard_scaler)
+ model = reloaded_standard_scaler.fit(self.dense_input)
+ reloaded_model = self.save_and_reload(model)
+ output = reloaded_model.transform(self.dense_input)[0]
+ self.verify_output_result(
+ output,
+ output.get_schema().get_field_names(),
+ standard_scaler.get_output_col(),
+ self.expected_res_with_std)
+
def verify_output_result(
self,
output: Table,
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_stringindexer.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_stringindexer.py
index 460da9b..0f6204b 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_stringindexer.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_stringindexer.py
@@ -18,8 +18,8 @@
from pyflink.common import Types, Row
-from pyflink.ml.lib.feature.stringindexer import StringIndexer
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.lib.feature.stringindexer import StringIndexer, StringIndexerModel
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
class StringIndexerTest(PyFlinkMLTestCase):
@@ -118,3 +118,53 @@ class StringIndexerTest(PyFlinkMLTestCase):
predicted_results.sort(key=lambda x: x[0])
self.assertEqual(predicted_results, self.expected_alphabetic_asc_predict_data)
+
+ def test_get_model_data(self):
+ string_indexer = StringIndexer() \
+ .set_input_cols('input_col1', 'input_col2') \
+ .set_output_cols('output_col1', 'output_col2') \
+ .set_string_order_type('alphabetAsc')
+ model = string_indexer.fit(self.train_table)
+ model_data = model.get_model_data()[0]
+ expected_field_names = ['stringArrays']
+ self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+ # TODO: Add test to collect and verify the model data results after FLINK-30122 is resolved.
+
+ def test_set_model_data(self):
+ string_indexer = StringIndexer() \
+ .set_input_cols('input_col1', 'input_col2') \
+ .set_output_cols('output_col1', 'output_col2') \
+ .set_string_order_type('alphabetAsc') \
+ .set_handle_invalid('keep')
+ model_a = string_indexer.fit(self.train_table)
+ model_data = model_a.get_model_data()[0]
+
+ model_b = StringIndexerModel().set_model_data(model_data)
+ update_existing_params(model_b, model_a)
+
+ output = model_b.transform(self.predict_table)[0]
+
+ predicted_results = [result for result in
+ self.t_env.to_data_stream(output).execute_and_collect()]
+
+ predicted_results.sort(key=lambda x: x[0])
+
+ self.assertEqual(predicted_results, self.expected_alphabetic_asc_predict_data)
+
+ def test_save_load_and_predict(self):
+ string_indexer = StringIndexer() \
+ .set_input_cols('input_col1', 'input_col2') \
+ .set_output_cols('output_col1', 'output_col2') \
+ .set_string_order_type('alphabetAsc') \
+ .set_handle_invalid('keep')
+ reloaded_string_indexer = self.save_and_reload(string_indexer)
+
+ model = reloaded_string_indexer.fit(self.train_table)
+ reloaded_model = self.save_and_reload(model)
+
+ output = reloaded_model.transform(self.predict_table)[0]
+ predicted_results = [result for result in
+ self.t_env.to_data_stream(output).execute_and_collect()]
+ predicted_results.sort(key=lambda x: x[0])
+ self.assertEqual(predicted_results, self.expected_alphabetic_asc_predict_data)
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_variancethresholdselector.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_variancethresholdselector.py
index b7a49a4..013b383 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_variancethresholdselector.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_variancethresholdselector.py
@@ -21,8 +21,9 @@ from pyflink.common import Types
from pyflink.table import Table
from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, DenseVector
-from pyflink.ml.lib.feature.variancethresholdselector import VarianceThresholdSelector
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.lib.feature.variancethresholdselector import \
+ VarianceThresholdSelector, VarianceThresholdSelectorModel
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
class VarianceThresholdSelectorTest(PyFlinkMLTestCase):
@@ -112,6 +113,33 @@ class VarianceThresholdSelectorTest(PyFlinkMLTestCase):
output.get_schema().get_field_names(),
self.expected_output)
+ def test_get_model_data(self):
+ variance_threshold_selector = VarianceThresholdSelector() \
+ .set_variance_threshold(8.0)
+ model = variance_threshold_selector.fit(self.train_table)
+ model_data = model.get_model_data()[0]
+ expected_field_names = ['numOfFeatures', 'indices']
+ self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+ # TODO: Add test to collect and verify the model data results after Flink dependency
+ # is upgraded to 1.15.3, 1.16.0 or a higher version. Related ticket: FLINK-29477
+
+ def test_set_model_data(self):
+ variance_threshold_selector = VarianceThresholdSelector() \
+ .set_variance_threshold(8.0)
+ model_a = variance_threshold_selector.fit(self.train_table)
+ model_data = model_a.get_model_data()[0]
+
+ model_b = VarianceThresholdSelectorModel().set_model_data(model_data)
+ update_existing_params(model_b, model_a)
+
+ output = model_b.transform(self.predict_table)[0]
+ self.verify_output_result(
+ output,
+ variance_threshold_selector.get_output_col(),
+ output.get_schema().get_field_names(),
+ self.expected_output)
+
def test_save_load_predict(self):
variance_threshold_selector = VarianceThresholdSelector() \
.set_variance_threshold(8.0)
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_vectorindexer.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_vectorindexer.py
index 8e63427..9ca499b 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/tests/test_vectorindexer.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_vectorindexer.py
@@ -22,7 +22,7 @@ from pyflink.common import Types
from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
from pyflink.ml.lib.feature.vectorindexer import VectorIndexer, VectorIndexerModel
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase, update_existing_params
class VectorIndexerTest(PyFlinkMLTestCase):
@@ -101,3 +101,28 @@ class VectorIndexerTest(PyFlinkMLTestCase):
predicted_results.sort(key=lambda x: x[1])
self.expected_output.sort(key=lambda x: x[1])
self.assertEqual(self.expected_output, predicted_results)
+
+ def test_get_model_data(self):
+ vector_indexer = VectorIndexer().set_handle_invalid('keep')
+ model = vector_indexer.fit(self.train_table)
+ model_data = model.get_model_data()[0]
+ expected_field_names = ['categoryMaps']
+ self.assertEqual(expected_field_names, model_data.get_schema().get_field_names())
+
+ # TODO: Add test to collect and verify the model data results after FLINK-30124 is resolved.
+
+ def test_set_model_data(self):
+ vector_indexer = VectorIndexer().set_handle_invalid('keep')
+ model_a = vector_indexer.fit(self.train_table)
+ model_data = model_a.get_model_data()[0]
+
+ model_b = VectorIndexerModel().set_model_data(model_data)
+ update_existing_params(model_b, model_a)
+
+ output = model_b.transform(self.predict_table)[0]
+ predicted_results = [result[1] for result in
+ self.t_env.to_data_stream(output).execute_and_collect()]
+
+ predicted_results.sort(key=lambda x: x[1])
+ self.expected_output.sort(key=lambda x: x[1])
+ self.assertEqual(self.expected_output, predicted_results)
diff --git a/flink-ml-python/pyflink/ml/tests/test_utils.py b/flink-ml-python/pyflink/ml/tests/test_utils.py
index 04104c4..4b376dc 100644
--- a/flink-ml-python/pyflink/ml/tests/test_utils.py
+++ b/flink-ml-python/pyflink/ml/tests/test_utils.py
@@ -24,9 +24,17 @@ import uuid
from pyflink.common import RestartStrategies, Configuration
from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.java_gateway import get_gateway
from pyflink.table import StreamTableEnvironment
from pyflink.util.java_utils import get_j_env_configuration
+from pyflink.ml.core.wrapper import JavaWithParams
+
+
+def update_existing_params(target: JavaWithParams, source: JavaWithParams):
+ get_gateway().jvm.org.apache.flink.ml.util.ReadWriteUtils \
+ .updateExistingParams(target._java_obj, source._java_obj.getParamMap())
+
class PyFlinkMLTestCase(unittest.TestCase):
def setUp(self):
@@ -84,3 +92,10 @@ class PyFlinkMLTestCase(unittest.TestCase):
raise e
load_func = getattr(stage, 'load')
return load_func(self.t_env, path)
+
+ def assertListAlmostEqual(self, expected_list, actual_list, places=None, msg=None,
+ delta=None):
+ self.assertEqual(len(expected_list), len(actual_list))
+ for i in range(len(expected_list)):
+ self.assertAlmostEqual(expected_list[i], actual_list[i],
+ places=places, msg=msg, delta=delta)