You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2022/06/29 02:34:31 UTC
[flink-ml] branch master updated: [FLINK-27715] Add pyflink examples
This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git
The following commit(s) were added to refs/heads/master by this push:
new 91e7464 [FLINK-27715] Add pyflink examples
91e7464 is described below
commit 91e74648bf47d1027ef3362c61456b90fcf89535
Author: yunfengzhou-hub <yu...@outlook.com>
AuthorDate: Wed Jun 29 10:34:26 2022 +0800
[FLINK-27715] Add pyflink examples
This closes #121.
---
.github/workflows/python-checks.yml | 4 +-
.../feature/stringindexer/IndexToStringModel.java | 9 +-
.../pyflink/examples/ml/classification/__init__.py | 17 +++
.../examples/ml/classification/knn_example.py | 93 +++++++++++++
.../ml/classification/linearsvc_example.py | 76 +++++++++++
.../classification/logisticregression_example.py | 77 +++++++++++
.../ml/classification/naivebayes_example.py | 80 +++++++++++
.../pyflink/examples/ml/clustering/__init__.py | 17 +++
.../examples/ml/clustering/kmeans_example.py | 67 +++++++++
.../pyflink/examples/ml/evaluation/__init__.py | 17 +++
.../binaryclassificationevaluator_example.py | 76 +++++++++++
.../pyflink/examples/ml/feature/__init__.py | 17 +++
.../examples/ml/feature/bucketizer_example.py | 73 ++++++++++
.../ml/feature/indextostringmodel_example.py | 76 +++++++++++
.../examples/ml/feature/minmaxscaler_example.py | 79 +++++++++++
.../examples/ml/feature/onehotencoder_example.py | 66 +++++++++
.../examples/ml/feature/standardscaler_example.py | 66 +++++++++
.../examples/ml/feature/stringindexer_example.py | 89 ++++++++++++
.../examples/ml/feature/vectorassembler_example.py | 71 ++++++++++
.../pyflink/examples/ml/regression/__init__.py | 17 +++
.../ml/regression/linearregression_example.py | 73 ++++++++++
.../pyflink/examples/ml/tests/__init__.py | 30 ++++
.../pyflink/examples/ml/tests/test_examples.py | 37 +++++
flink-ml-python/pyflink/ml/__init__.py | 1 +
flink-ml-python/pyflink/ml/core/linalg.py | 2 +-
.../tests/test_logisticregression.py | 17 ---
.../pyflink/ml/lib/feature/stringindexer.py | 8 +-
.../lib/feature/tests/test_indextostringmodel.py | 77 +++++++++++
.../pyflink/ml/lib/regression/__init__.py | 17 +++
.../pyflink/ml/lib/regression/common.py | 74 ++++++++++
.../pyflink/ml/lib/regression/linearregression.py | 97 +++++++++++++
.../pyflink/ml/lib/regression/tests/__init__.py | 30 ++++
.../lib/regression/tests/test_linearregression.py | 151 +++++++++++++++++++++
.../ml/lib/tests/test_ml_lib_completeness.py | 16 ++-
34 files changed, 1685 insertions(+), 32 deletions(-)
diff --git a/.github/workflows/python-checks.yml b/.github/workflows/python-checks.yml
index d53f4e4..2c59761 100644
--- a/.github/workflows/python-checks.yml
+++ b/.github/workflows/python-checks.yml
@@ -41,6 +41,8 @@ jobs:
run: python -m mypy --config=setup.cfg
- name: Test the source code
working-directory: flink-ml-python
- run: pytest
+ run: |
+ pytest pyflink/ml
+ pytest pyflink/examples
diff --git a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/stringindexer/IndexToStringModel.java b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/stringindexer/IndexToStringModel.java
index 4090193..f3a1720 100644
--- a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/stringindexer/IndexToStringModel.java
+++ b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/stringindexer/IndexToStringModel.java
@@ -147,15 +147,8 @@ public class IndexToStringModel
}
Row outputStrings = new Row(inputCols.length);
- int stringId;
for (int i = 0; i < inputCols.length; i++) {
- try {
- stringId = (Integer) input.getField(inputCols[i]);
- } catch (Exception e) {
- throw new RuntimeException(
- "The input contains non-integer value: "
- + input.getField(inputCols[i] + "."));
- }
+ int stringId = (Integer) input.getField(inputCols[i]);
if (stringId < stringArrays[i].length && stringId >= 0) {
outputStrings.setField(i, stringArrays[i][stringId]);
} else {
diff --git a/flink-ml-python/pyflink/examples/ml/classification/__init__.py b/flink-ml-python/pyflink/examples/ml/classification/__init__.py
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/classification/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git a/flink-ml-python/pyflink/examples/ml/classification/knn_example.py b/flink-ml-python/pyflink/examples/ml/classification/knn_example.py
new file mode 100644
index 0000000..1c4b0fb
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/classification/knn_example.py
@@ -0,0 +1,93 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that trains a Knn model and uses it for classification.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.classification.knn import KNN
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input training and prediction data
+train_data = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense([2.0, 3.0]), 1.0),
+ (Vectors.dense([2.1, 3.1]), 1.0),
+ (Vectors.dense([200.1, 300.1]), 2.0),
+ (Vectors.dense([200.2, 300.2]), 2.0),
+ (Vectors.dense([200.3, 300.3]), 2.0),
+ (Vectors.dense([200.4, 300.4]), 2.0),
+ (Vectors.dense([200.4, 300.4]), 2.0),
+ (Vectors.dense([200.6, 300.6]), 2.0),
+ (Vectors.dense([2.1, 3.1]), 1.0),
+ (Vectors.dense([2.1, 3.1]), 1.0),
+ (Vectors.dense([2.1, 3.1]), 1.0),
+ (Vectors.dense([2.1, 3.1]), 1.0),
+ (Vectors.dense([2.3, 3.2]), 1.0),
+ (Vectors.dense([2.3, 3.2]), 1.0),
+ (Vectors.dense([2.8, 3.2]), 3.0),
+ (Vectors.dense([300., 3.2]), 4.0),
+ (Vectors.dense([2.2, 3.2]), 1.0),
+ (Vectors.dense([2.4, 3.2]), 5.0),
+ (Vectors.dense([2.5, 3.2]), 5.0),
+ (Vectors.dense([2.5, 3.2]), 5.0),
+ (Vectors.dense([2.1, 3.1]), 1.0)
+ ],
+ type_info=Types.ROW_NAMED(
+ ['features', 'label'],
+ [DenseVectorTypeInfo(), Types.DOUBLE()])))
+
+predict_data = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense([4.0, 4.1]), 5.0),
+ (Vectors.dense([300, 42]), 2.0),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['features', 'label'],
+ [DenseVectorTypeInfo(), Types.DOUBLE()])))
+
+# create a knn object and initialize its parameters
+knn = KNN().set_k(4)
+
+# train the knn model
+model = knn.fit(train_data)
+
+# use the knn model for predictions
+output = model.transform(predict_data)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ features = result[field_names.index(knn.get_features_col())]
+ expected_result = result[field_names.index(knn.get_label_col())]
+ actual_result = result[field_names.index(knn.get_prediction_col())]
+ print('Features: ' + str(features) + ' \tExpected Result: ' + str(expected_result)
+ + ' \tActual Result: ' + str(actual_result))
diff --git a/flink-ml-python/pyflink/examples/ml/classification/linearsvc_example.py b/flink-ml-python/pyflink/examples/ml/classification/linearsvc_example.py
new file mode 100644
index 0000000..eb19aef
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/classification/linearsvc_example.py
@@ -0,0 +1,76 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that trains a LinearSVC model and uses it for classification.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.classification.linearsvc import LinearSVC
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_table = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense([1, 2, 3, 4]), 0., 1.),
+ (Vectors.dense([2, 2, 3, 4]), 0., 2.),
+ (Vectors.dense([3, 2, 3, 4]), 0., 3.),
+ (Vectors.dense([4, 2, 3, 4]), 0., 4.),
+ (Vectors.dense([5, 2, 3, 4]), 0., 5.),
+ (Vectors.dense([11, 2, 3, 4]), 1., 1.),
+ (Vectors.dense([12, 2, 3, 4]), 1., 2.),
+ (Vectors.dense([13, 2, 3, 4]), 1., 3.),
+ (Vectors.dense([14, 2, 3, 4]), 1., 4.),
+ (Vectors.dense([15, 2, 3, 4]), 1., 5.),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['features', 'label', 'weight'],
+ [DenseVectorTypeInfo(), Types.DOUBLE(), Types.DOUBLE()])
+ ))
+
+# create a linear svc object and initialize its parameters
+linear_svc = LinearSVC().set_weight_col('weight')
+
+# train the linear svc model
+model = linear_svc.fit(input_table)
+
+# use the linear svc model for predictions
+output = model.transform(input_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ features = result[field_names.index(linear_svc.get_features_col())]
+ expected_result = result[field_names.index(linear_svc.get_label_col())]
+ prediction_result = result[field_names.index(linear_svc.get_prediction_col())]
+ raw_prediction_result = result[field_names.index(linear_svc.get_raw_prediction_col())]
+ print('Features: ' + str(features) + ' \tExpected Result: ' + str(expected_result)
+ + ' \tPrediction Result: ' + str(prediction_result)
+ + ' \tRaw Prediction Result: ' + str(raw_prediction_result))
diff --git a/flink-ml-python/pyflink/examples/ml/classification/logisticregression_example.py b/flink-ml-python/pyflink/examples/ml/classification/logisticregression_example.py
new file mode 100644
index 0000000..1b31971
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/classification/logisticregression_example.py
@@ -0,0 +1,77 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that trains a LogisticRegression model and uses it for
+# classification.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.classification.logisticregression import LogisticRegression
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense([1, 2, 3, 4]), 0., 1.),
+ (Vectors.dense([2, 2, 3, 4]), 0., 2.),
+ (Vectors.dense([3, 2, 3, 4]), 0., 3.),
+ (Vectors.dense([4, 2, 3, 4]), 0., 4.),
+ (Vectors.dense([5, 2, 3, 4]), 0., 5.),
+ (Vectors.dense([11, 2, 3, 4]), 1., 1.),
+ (Vectors.dense([12, 2, 3, 4]), 1., 2.),
+ (Vectors.dense([13, 2, 3, 4]), 1., 3.),
+ (Vectors.dense([14, 2, 3, 4]), 1., 4.),
+ (Vectors.dense([15, 2, 3, 4]), 1., 5.),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['features', 'label', 'weight'],
+ [DenseVectorTypeInfo(), Types.DOUBLE(), Types.DOUBLE()])
+ ))
+
+# create a logistic regression object and initialize its parameters
+logistic_regression = LogisticRegression().set_weight_col('weight')
+
+# train the logistic regression model
+model = logistic_regression.fit(input_data)
+
+# use the logistic regression model for predictions
+output = model.transform(input_data)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ features = result[field_names.index(logistic_regression.get_features_col())]
+ expected_result = result[field_names.index(logistic_regression.get_label_col())]
+ prediction_result = result[field_names.index(logistic_regression.get_prediction_col())]
+ raw_prediction_result = result[field_names.index(logistic_regression.get_raw_prediction_col())]
+ print('Features: ' + str(features) + ' \tExpected Result: ' + str(expected_result)
+ + ' \tPrediction Result: ' + str(prediction_result)
+ + ' \tRaw Prediction Result: ' + str(raw_prediction_result))
diff --git a/flink-ml-python/pyflink/examples/ml/classification/naivebayes_example.py b/flink-ml-python/pyflink/examples/ml/classification/naivebayes_example.py
new file mode 100644
index 0000000..20b940d
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/classification/naivebayes_example.py
@@ -0,0 +1,80 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that trains a NaiveBayes model and uses it for classification.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.classification.naivebayes import NaiveBayes
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input training and prediction data
+train_table = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense([0, 0.]), 11.),
+ (Vectors.dense([1, 0]), 10.),
+ (Vectors.dense([1, 1.]), 10.),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['features', 'label'],
+ [DenseVectorTypeInfo(), Types.DOUBLE()])))
+
+predict_table = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense([0, 1.]),),
+ (Vectors.dense([0, 0.]),),
+ (Vectors.dense([1, 0]),),
+ (Vectors.dense([1, 1.]),),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['features'],
+ [DenseVectorTypeInfo()])))
+
+# create a naive bayes object and initialize its parameters
+naive_bayes = NaiveBayes() \
+ .set_smoothing(1.0) \
+ .set_features_col('features') \
+ .set_label_col('label') \
+ .set_prediction_col('prediction') \
+ .set_model_type('multinomial')
+
+# train the naive bayes model
+model = naive_bayes.fit(train_table)
+
+# use the naive bayes model for predictions
+output = model.transform(predict_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ features = result[field_names.index(naive_bayes.get_features_col())]
+ prediction_result = result[field_names.index(naive_bayes.get_prediction_col())]
+ print('Features: ' + str(features) + ' \tPrediction Result: ' + str(prediction_result))
diff --git a/flink-ml-python/pyflink/examples/ml/clustering/__init__.py b/flink-ml-python/pyflink/examples/ml/clustering/__init__.py
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/clustering/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git a/flink-ml-python/pyflink/examples/ml/clustering/kmeans_example.py b/flink-ml-python/pyflink/examples/ml/clustering/kmeans_example.py
new file mode 100644
index 0000000..72c5d10
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/clustering/kmeans_example.py
@@ -0,0 +1,67 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that trains a KMeans model and uses it for clustering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.clustering.kmeans import KMeans
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense([0.0, 0.0]),),
+ (Vectors.dense([0.0, 0.3]),),
+ (Vectors.dense([0.3, 3.0]),),
+ (Vectors.dense([9.0, 0.0]),),
+ (Vectors.dense([9.0, 0.6]),),
+ (Vectors.dense([9.6, 0.0]),),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['features'],
+ [DenseVectorTypeInfo()])))
+
+# create a kmeans object and initialize its parameters
+kmeans = KMeans().set_k(2).set_seed(1)
+
+# train the kmeans model
+model = kmeans.fit(input_data)
+
+# use the kmeans model for predictions
+output = model.transform(input_data)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ features = result[field_names.index(kmeans.get_features_col())]
+ cluster_id = result[field_names.index(kmeans.get_prediction_col())]
+ print('Features: ' + str(features) + ' \tCluster Id: ' + str(cluster_id))
diff --git a/flink-ml-python/pyflink/examples/ml/evaluation/__init__.py b/flink-ml-python/pyflink/examples/ml/evaluation/__init__.py
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/evaluation/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git a/flink-ml-python/pyflink/examples/ml/evaluation/binaryclassificationevaluator_example.py b/flink-ml-python/pyflink/examples/ml/evaluation/binaryclassificationevaluator_example.py
new file mode 100644
index 0000000..72ca449
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/evaluation/binaryclassificationevaluator_example.py
@@ -0,0 +1,76 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that creates a BinaryClassificationEvaluator instance and uses
+# it for evaluation.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.evaluation.binaryclassificationevaluator import BinaryClassificationEvaluator
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_table = t_env.from_data_stream(
+ env.from_collection([
+ (1.0, Vectors.dense(0.1, 0.9)),
+ (1.0, Vectors.dense(0.2, 0.8)),
+ (1.0, Vectors.dense(0.3, 0.7)),
+ (0.0, Vectors.dense(0.25, 0.75)),
+ (0.0, Vectors.dense(0.4, 0.6)),
+ (1.0, Vectors.dense(0.35, 0.65)),
+ (1.0, Vectors.dense(0.45, 0.55)),
+ (0.0, Vectors.dense(0.6, 0.4)),
+ (0.0, Vectors.dense(0.7, 0.3)),
+ (1.0, Vectors.dense(0.65, 0.35)),
+ (0.0, Vectors.dense(0.8, 0.2)),
+ (1.0, Vectors.dense(0.9, 0.1))
+ ],
+ type_info=Types.ROW_NAMED(
+ ['label', 'rawPrediction'],
+ [Types.DOUBLE(), DenseVectorTypeInfo()]))
+)
+
+# create a binary classification evaluator object and initialize its parameters
+evaluator = BinaryClassificationEvaluator() \
+ .set_metrics_names('areaUnderPR', 'ks', 'areaUnderROC')
+
+# use the binary classification evaluator model for evaluations
+output = evaluator.transform(input_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+result = t_env.to_data_stream(output).execute_and_collect().next()
+print('Area under the precision-recall curve: '
+ + str(result[field_names.index('areaUnderPR')]))
+print('Area under the receiver operating characteristic curve: '
+ + str(result[field_names.index('areaUnderROC')]))
+print('Kolmogorov-Smirnov value: '
+ + str(result[field_names.index('ks')]))
diff --git a/flink-ml-python/pyflink/examples/ml/feature/__init__.py b/flink-ml-python/pyflink/examples/ml/feature/__init__.py
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git a/flink-ml-python/pyflink/examples/ml/feature/bucketizer_example.py b/flink-ml-python/pyflink/examples/ml/feature/bucketizer_example.py
new file mode 100644
index 0000000..c0d3b81
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/bucketizer_example.py
@@ -0,0 +1,73 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that creates a Bucketizer instance and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.lib.feature.bucketizer import Bucketizer
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data = t_env.from_data_stream(
+ env.from_collection([
+ (-0.5, 0.0, 1.0, 0.0),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['f1', 'f2', 'f3', 'f4'],
+ [Types.DOUBLE(), Types.DOUBLE(), Types.DOUBLE(), Types.DOUBLE()])
+ ))
+
+# create a bucketizer object and initialize its parameters
+splits_array = [
+ [-0.5, 0.0, 0.5],
+ [-1.0, 0.0, 2.0],
+ [float('-inf'), 10.0, float('inf')],
+ [float('-inf'), 1.5, float('inf')],
+]
+
+bucketizer = Bucketizer() \
+ .set_input_cols('f1', 'f2', 'f3', 'f4') \
+ .set_output_cols('o1', 'o2', 'o3', 'o4') \
+ .set_splits_array(splits_array)
+
+# use the bucketizer model for feature engineering
+output = bucketizer.transform(input_data)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+input_values = [None for _ in bucketizer.get_input_cols()]
+output_values = [None for _ in bucketizer.get_input_cols()]
+for result in t_env.to_data_stream(output).execute_and_collect():
+ for i in range(len(bucketizer.get_input_cols())):
+ input_values[i] = result[field_names.index(bucketizer.get_input_cols()[i])]
+ output_values[i] = result[field_names.index(bucketizer.get_output_cols()[i])]
+ print('Input Values: ' + str(input_values) + '\tOutput Values: ' + str(output_values))
diff --git a/flink-ml-python/pyflink/examples/ml/feature/indextostringmodel_example.py b/flink-ml-python/pyflink/examples/ml/feature/indextostringmodel_example.py
new file mode 100644
index 0000000..72f8989
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/indextostringmodel_example.py
@@ -0,0 +1,76 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that creates a IndexToStringModelExample instance and uses it
+# for feature engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.lib.feature.stringindexer import IndexToStringModel
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+predict_table = t_env.from_data_stream(
+ env.from_collection([
+ (0, 3),
+ (1, 2),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['input_col1', 'input_col2'],
+ [Types.INT(), Types.INT()])
+ ))
+
+# create an index-to-string model and initialize its parameters and model data
+model_data_table = t_env.from_data_stream(
+ env.from_collection([
+ ([['a', 'b', 'c', 'd'], [-1., 0., 1., 2.]],),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['stringArrays'],
+ [Types.OBJECT_ARRAY(Types.OBJECT_ARRAY(Types.STRING()))])
+ ))
+
+model = IndexToStringModel() \
+ .set_input_cols('input_col1', 'input_col2') \
+ .set_output_cols('output_col1', 'output_col2') \
+ .set_model_data(model_data_table)
+
+# use the index-to-string model for feature engineering
+output = model.transform(predict_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+input_values = [None for _ in model.get_input_cols()]
+output_values = [None for _ in model.get_input_cols()]
+for result in t_env.to_data_stream(output).execute_and_collect():
+ for i in range(len(model.get_input_cols())):
+ input_values[i] = result[field_names.index(model.get_input_cols()[i])]
+ output_values[i] = result[field_names.index(model.get_output_cols()[i])]
+ print('Input Values: ' + str(input_values) + '\tOutput Values: ' + str(output_values))
diff --git a/flink-ml-python/pyflink/examples/ml/feature/minmaxscaler_example.py b/flink-ml-python/pyflink/examples/ml/feature/minmaxscaler_example.py
new file mode 100644
index 0000000..207381d
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/minmaxscaler_example.py
@@ -0,0 +1,79 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that trains a MinMaxScaler model and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.minmaxscaler import MinMaxScaler
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input training and prediction data
+train_data = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense(0.0, 3.0),),
+ (Vectors.dense(2.1, 0.0),),
+ (Vectors.dense(4.1, 5.1),),
+ (Vectors.dense(6.1, 8.1),),
+ (Vectors.dense(200, 400),),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['input'],
+ [DenseVectorTypeInfo()])
+ ))
+
+predict_data = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense(150.0, 90.0),),
+ (Vectors.dense(50.0, 40.0),),
+ (Vectors.dense(100.0, 50.0),),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['input'],
+ [DenseVectorTypeInfo()])
+ ))
+
+# create a min-max-scaler object and initialize its parameters
+min_max_scaler = MinMaxScaler()
+
+# train the min-max-scaler model
+model = min_max_scaler.fit(train_data)
+
+# use the min-max-scaler model for predictions
+output = model.transform(predict_data)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ input_value = result[field_names.index(min_max_scaler.get_input_col())]
+ output_value = result[field_names.index(min_max_scaler.get_output_col())]
+ print('Input Value: ' + str(input_value) + ' \tOutput Value: ' + str(output_value))
diff --git a/flink-ml-python/pyflink/examples/ml/feature/onehotencoder_example.py b/flink-ml-python/pyflink/examples/ml/feature/onehotencoder_example.py
new file mode 100644
index 0000000..8d87ba3
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/onehotencoder_example.py
@@ -0,0 +1,66 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that trains a OneHotEncoder model and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Row
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.lib.feature.onehotencoder import OneHotEncoder
+from pyflink.table import StreamTableEnvironment, DataTypes
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input training and prediction data
+train_table = t_env.from_elements(
+ [Row(0.0), Row(1.0), Row(2.0), Row(0.0)],
+ DataTypes.ROW([
+ DataTypes.FIELD('input', DataTypes.DOUBLE())
+ ]))
+
+predict_table = t_env.from_elements(
+ [Row(0.0), Row(1.0), Row(2.0)],
+ DataTypes.ROW([
+ DataTypes.FIELD('input', DataTypes.DOUBLE())
+ ]))
+
+# create a one-hot-encoder object and initialize its parameters
+one_hot_encoder = OneHotEncoder().set_input_cols('input').set_output_cols('output')
+
+# train the one-hot-encoder model
+model = one_hot_encoder.fit(train_table)
+
+# use the one-hot-encoder model for predictions
+output = model.transform(predict_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ input_value = result[field_names.index(one_hot_encoder.get_input_cols()[0])]
+ output_value = result[field_names.index(one_hot_encoder.get_output_cols()[0])]
+ print('Input Value: ' + str(input_value) + ' \tOutput Value: ' + str(output_value))
diff --git a/flink-ml-python/pyflink/examples/ml/feature/standardscaler_example.py b/flink-ml-python/pyflink/examples/ml/feature/standardscaler_example.py
new file mode 100644
index 0000000..b0f726e
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/standardscaler_example.py
@@ -0,0 +1,66 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that trains a StandardScaler model and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.standardscaler import StandardScaler
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense(-2.5, 9, 1),),
+ (Vectors.dense(1.4, -5, 1),),
+ (Vectors.dense(2, -1, -2),),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['input'],
+ [DenseVectorTypeInfo()])
+ ))
+
+# create a standard-scaler object and initialize its parameters
+standard_scaler = StandardScaler()
+
+# train the standard-scaler model
+model = standard_scaler.fit(input_data)
+
+# use the standard-scaler model for predictions
+output = model.transform(input_data)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ input_value = result[field_names.index(standard_scaler.get_input_col())]
+ output_value = result[field_names.index(standard_scaler.get_output_col())]
+ print('Input Value: ' + str(input_value) + ' \tOutput Value: ' + str(output_value))
diff --git a/flink-ml-python/pyflink/examples/ml/feature/stringindexer_example.py b/flink-ml-python/pyflink/examples/ml/feature/stringindexer_example.py
new file mode 100644
index 0000000..2f08b90
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/stringindexer_example.py
@@ -0,0 +1,89 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that trains a StringIndexer model and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.lib.feature.stringindexer import StringIndexer
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input training and prediction data
+train_table = t_env.from_data_stream(
+ env.from_collection([
+ ('a', 1.),
+ ('b', 1.),
+ ('b', 2.),
+ ('c', 0.),
+ ('d', 2.),
+ ('a', 2.),
+ ('b', 2.),
+ ('b', -1.),
+ ('a', -1.),
+ ('c', -1.),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['input_col1', 'input_col2'],
+ [Types.STRING(), Types.DOUBLE()])
+ ))
+
+predict_table = t_env.from_data_stream(
+ env.from_collection([
+ ('a', 2.),
+ ('b', 1.),
+ ('c', 2.),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['input_col1', 'input_col2'],
+ [Types.STRING(), Types.DOUBLE()])
+ ))
+
+# create a string-indexer object and initialize its parameters
+string_indexer = StringIndexer() \
+ .set_string_order_type('alphabetAsc') \
+ .set_input_cols('input_col1', 'input_col2') \
+ .set_output_cols('output_col1', 'output_col2')
+
+# train the string-indexer model
+model = string_indexer.fit(train_table)
+
+# use the string-indexer model for feature engineering
+output = model.transform(predict_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+input_values = [None for _ in string_indexer.get_input_cols()]
+output_values = [None for _ in string_indexer.get_input_cols()]
+for result in t_env.to_data_stream(output).execute_and_collect():
+ for i in range(len(string_indexer.get_input_cols())):
+ input_values[i] = result[field_names.index(string_indexer.get_input_cols()[i])]
+ output_values[i] = result[field_names.index(string_indexer.get_output_cols()[i])]
+ print('Input Values: ' + str(input_values) + '\tOutput Values: ' + str(output_values))
diff --git a/flink-ml-python/pyflink/examples/ml/feature/vectorassembler_example.py b/flink-ml-python/pyflink/examples/ml/feature/vectorassembler_example.py
new file mode 100644
index 0000000..db9ac44
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/vectorassembler_example.py
@@ -0,0 +1,71 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that creates a VectorAssembler instance and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo, SparseVectorTypeInfo
+from pyflink.ml.lib.feature.vectorassembler import VectorAssembler
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data_table = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense(2.1, 3.1),
+ 1.0,
+ Vectors.sparse(5, [3], [1.0])),
+ (Vectors.dense(2.1, 3.1),
+ 1.0,
+ Vectors.sparse(5, [1, 2, 3, 4],
+ [1.0, 2.0, 3.0, 4.0])),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['vec', 'num', 'sparse_vec'],
+ [DenseVectorTypeInfo(), Types.DOUBLE(), SparseVectorTypeInfo()])))
+
+# create a vector assembler object and initialize its parameters
+vector_assembler = VectorAssembler() \
+ .set_input_cols('vec', 'num', 'sparse_vec') \
+ .set_output_col('assembled_vec') \
+ .set_handle_invalid('keep')
+
+# use the vector assembler model for feature engineering
+output = vector_assembler.transform(input_data_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+input_values = [None for _ in vector_assembler.get_input_cols()]
+for result in t_env.to_data_stream(output).execute_and_collect():
+ for i in range(len(vector_assembler.get_input_cols())):
+ input_values[i] = result[field_names.index(vector_assembler.get_input_cols()[i])]
+ output_value = result[field_names.index(vector_assembler.get_output_col())]
+ print('Input Values: ' + str(input_values) + '\tOutput Value: ' + str(output_value))
diff --git a/flink-ml-python/pyflink/examples/ml/regression/__init__.py b/flink-ml-python/pyflink/examples/ml/regression/__init__.py
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/regression/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git a/flink-ml-python/pyflink/examples/ml/regression/linearregression_example.py b/flink-ml-python/pyflink/examples/ml/regression/linearregression_example.py
new file mode 100644
index 0000000..c14d92a
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/regression/linearregression_example.py
@@ -0,0 +1,73 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that trains a LinearRegression model and uses it for
+# regression.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to setup Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.regression.linearregression import LinearRegression
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_table = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense(2, 1), 4., 1.),
+ (Vectors.dense(3, 2), 7., 1.),
+ (Vectors.dense(4, 3), 10., 1.),
+ (Vectors.dense(2, 4), 10., 1.),
+ (Vectors.dense(2, 2), 6., 1.),
+ (Vectors.dense(4, 3), 10., 1.),
+ (Vectors.dense(1, 2), 5., 1.),
+ (Vectors.dense(5, 3), 11., 1.),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['features', 'label', 'weight'],
+ [DenseVectorTypeInfo(), Types.DOUBLE(), Types.DOUBLE()])
+ ))
+
+# create a linear regression object and initialize its parameters
+linear_regression = LinearRegression().set_weight_col('weight')
+
+# train the linear regression model
+model = linear_regression.fit(input_table)
+
+# use the linear regression model for predictions
+output = model.transform(input_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ features = result[field_names.index(linear_regression.get_features_col())]
+ expected_result = result[field_names.index(linear_regression.get_label_col())]
+ prediction_result = result[field_names.index(linear_regression.get_prediction_col())]
+ print('Features: ' + str(features) + ' \tExpected Result: ' + str(expected_result)
+ + ' \tPrediction Result: ' + str(prediction_result))
diff --git a/flink-ml-python/pyflink/examples/ml/tests/__init__.py b/flink-ml-python/pyflink/examples/ml/tests/__init__.py
new file mode 100644
index 0000000..d91d483
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/tests/__init__.py
@@ -0,0 +1,30 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+import sys
+from pathlib import Path
+
+# Because the project and the dependent `pyflink` project have the same directory structure,
+# we need to manually add `flink-ml-python` path to `sys.path` in the test of this project to change
+# the order of package search.
+flink_ml_python_dir = Path(__file__).parents[4]
+sys.path.append(str(flink_ml_python_dir))
+
+import pyflink
+
+pyflink.__path__.insert(0, os.path.join(flink_ml_python_dir, 'pyflink'))
diff --git a/flink-ml-python/pyflink/examples/ml/tests/test_examples.py b/flink-ml-python/pyflink/examples/ml/tests/test_examples.py
new file mode 100644
index 0000000..afe6deb
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/tests/test_examples.py
@@ -0,0 +1,37 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import importlib
+import pkgutil
+
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+
+
+class ExamplesTest(PyFlinkMLTestCase):
+ def test_examples(self):
+ self.execute_all_in_module('pyflink.examples.ml.classification')
+ self.execute_all_in_module('pyflink.examples.ml.clustering')
+ self.execute_all_in_module('pyflink.examples.ml.evaluation')
+ self.execute_all_in_module('pyflink.examples.ml.feature')
+ self.execute_all_in_module('pyflink.examples.ml.regression')
+
+ def execute_all_in_module(self, module):
+ module = importlib.import_module(module)
+ for importer, sub_modname, ispkg in pkgutil.iter_modules(module.__path__):
+ # importing an example module means executing the example.
+ importlib.import_module(module.__name__ + "." + sub_modname)
diff --git a/flink-ml-python/pyflink/ml/__init__.py b/flink-ml-python/pyflink/ml/__init__.py
index 5bcf4b1..68e3a97 100644
--- a/flink-ml-python/pyflink/ml/__init__.py
+++ b/flink-ml-python/pyflink/ml/__init__.py
@@ -21,6 +21,7 @@ from pyflink.util import java_utils
from pyflink.util.java_utils import to_jarray, load_java_class
+# TODO: Remove custom jar loader after FLINK-15635 and FLINK-28002 are fixed and released.
def add_jars_to_context_class_loader(jar_urls):
"""
Add jars to Python gateway server for local compilation and local execution (i.e. minicluster).
diff --git a/flink-ml-python/pyflink/ml/core/linalg.py b/flink-ml-python/pyflink/ml/core/linalg.py
index fc0cd95..196a8f4 100644
--- a/flink-ml-python/pyflink/ml/core/linalg.py
+++ b/flink-ml-python/pyflink/ml/core/linalg.py
@@ -615,7 +615,7 @@ class SparseVector(Vector):
def __str__(self):
inds = "[" + ",".join([str(i) for i in self._indices]) + "]"
vals = "[" + ",".join([str(v) for v in self._values]) + "]"
- return "(" + ",".join((str(self.size), inds, vals)) + ")"
+ return "(" + ",".join((str(self.size()), inds, vals)) + ")"
def __repr__(self):
inds = self._indices
diff --git a/flink-ml-python/pyflink/ml/lib/classification/tests/test_logisticregression.py b/flink-ml-python/pyflink/ml/lib/classification/tests/test_logisticregression.py
index e460155..6e46796 100644
--- a/flink-ml-python/pyflink/ml/lib/classification/tests/test_logisticregression.py
+++ b/flink-ml-python/pyflink/ml/lib/classification/tests/test_logisticregression.py
@@ -85,23 +85,6 @@ class LogisticRegressionTest(PyFlinkMLTestCase):
self.assertEqual(regression.prediction_col, 'test_prediction_col')
self.assertEqual(regression.raw_prediction_col, 'test_raw_prediction_col')
- def test_feature_prediction_param(self):
- temp_table = self.binomial_data_table.alias("test_features", "test_label", "test_weight")
- regression = LogisticRegression() \
- .set_features_col("test_features") \
- .set_label_col("test_label") \
- .set_weight_col("test_weight") \
- .set_prediction_col("test_prediction_col") \
- .set_raw_prediction_col("test_raw_prediction_col")
- output = regression.fit(self.binomial_data_table).transform(temp_table)[0]
-
- self.assertEqual(output.get_schema().get_field_names(),
- ['test_features',
- 'test_label',
- 'test_weight',
- 'test_prediction_col',
- 'test_raw_prediction_col'])
-
def test_output_schema(self):
temp_table = self.binomial_data_table.alias("test_features", "test_label", "test_weight")
regression = LogisticRegression() \
diff --git a/flink-ml-python/pyflink/ml/lib/feature/stringindexer.py b/flink-ml-python/pyflink/ml/lib/feature/stringindexer.py
index 448f469..a42b6ed 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/stringindexer.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/stringindexer.py
@@ -23,9 +23,13 @@ from pyflink.ml.lib.feature.common import JavaFeatureModel, JavaFeatureEstimator
from pyflink.ml.lib.param import HasInputCols, HasOutputCols, HasHandleInvalid
-class _IndexToStringModelParams(JavaWithParams):
+class _IndexToStringModelParams(
+ JavaWithParams,
+ HasInputCols,
+ HasOutputCols
+):
"""
- Params for :class:`IndexToString`.
+ Params for :class:`IndexToStringModel`.
"""
def __init__(self, java_params):
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_indextostringmodel.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_indextostringmodel.py
new file mode 100644
index 0000000..42df909
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_indextostringmodel.py
@@ -0,0 +1,77 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.common import Types, Row
+
+from pyflink.ml.lib.feature.stringindexer import IndexToStringModel
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+
+
+class IndexToStringModelTest(PyFlinkMLTestCase):
+ def setUp(self):
+ super(IndexToStringModelTest, self).setUp()
+ self.model_data_table = self.t_env.from_data_stream(
+ self.env.from_collection([
+ ([['a', 'b', 'c', 'd'], [-1., 0., 1., 2.]],),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['stringArrays'],
+ [Types.OBJECT_ARRAY(Types.OBJECT_ARRAY(Types.STRING()))])
+ ))
+
+ self.predict_table = self.t_env.from_data_stream(
+ self.env.from_collection([
+ (0, 3),
+ (1, 2),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['input_col1', 'input_col2'],
+ [Types.INT(), Types.INT()])
+ ))
+
+ self.expected_prediction = [
+ Row(0, 3, 'a', '2.0'),
+ Row(1, 2, 'b', '1.0'),
+ ]
+
+ def test_output_schema(self):
+ model = IndexToStringModel() \
+ .set_input_cols('input_col1', 'input_col2') \
+ .set_output_cols('output_col1', 'output_col2') \
+ .set_model_data(self.model_data_table)
+
+ output = model.transform(self.predict_table)[0]
+
+ self.assertEqual(
+ ['input_col1', 'input_col2', 'output_col1', 'output_col2'],
+ output.get_schema().get_field_names())
+
+ def test_fit_and_predict(self):
+ model = IndexToStringModel() \
+ .set_input_cols('input_col1', 'input_col2') \
+ .set_output_cols('output_col1', 'output_col2') \
+ .set_model_data(self.model_data_table)
+
+ output = model.transform(self.predict_table)[0]
+
+ predicted_results = [result for result in
+ self.t_env.to_data_stream(output).execute_and_collect()]
+
+ predicted_results.sort(key=lambda x: x[0])
+
+ self.assertEqual(predicted_results, self.expected_prediction)
diff --git a/flink-ml-python/pyflink/ml/lib/regression/__init__.py b/flink-ml-python/pyflink/ml/lib/regression/__init__.py
new file mode 100644
index 0000000..65b48d4
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/regression/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git a/flink-ml-python/pyflink/ml/lib/regression/common.py b/flink-ml-python/pyflink/ml/lib/regression/common.py
new file mode 100644
index 0000000..028afd6
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/regression/common.py
@@ -0,0 +1,74 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from abc import ABC, abstractmethod
+
+from pyflink.ml.core.wrapper import JavaModel, JavaEstimator
+
+JAVA_REGRESSION_PACKAGE_NAME = "org.apache.flink.ml.regression"
+
+
+class JavaRegressionModel(JavaModel, ABC):
+ """
+ Wrapper class for a Java Regression Model.
+ """
+
+ def __init__(self, java_model):
+ super(JavaRegressionModel, self).__init__(java_model)
+
+ @classmethod
+ def _java_stage_path(cls) -> str:
+ return ".".join(
+ [JAVA_REGRESSION_PACKAGE_NAME,
+ cls._java_model_package_name(),
+ cls._java_model_class_name()])
+
+ @classmethod
+ @abstractmethod
+ def _java_model_package_name(cls) -> str:
+ pass
+
+ @classmethod
+ @abstractmethod
+ def _java_model_class_name(cls) -> str:
+ pass
+
+
+class JavaRegressionEstimator(JavaEstimator, ABC):
+ """
+ Wrapper class for a Java Regression Estimator.
+ """
+
+ def __init__(self):
+ super(JavaRegressionEstimator, self).__init__()
+
+ @classmethod
+ def _java_stage_path(cls):
+ return ".".join(
+ [JAVA_REGRESSION_PACKAGE_NAME,
+ cls._java_estimator_package_name(),
+ cls._java_estimator_class_name()])
+
+ @classmethod
+ @abstractmethod
+ def _java_estimator_package_name(cls) -> str:
+ pass
+
+ @classmethod
+ @abstractmethod
+ def _java_estimator_class_name(cls) -> str:
+ pass
diff --git a/flink-ml-python/pyflink/ml/lib/regression/linearregression.py b/flink-ml-python/pyflink/ml/lib/regression/linearregression.py
new file mode 100644
index 0000000..f19348b
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/regression/linearregression.py
@@ -0,0 +1,97 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from abc import ABC
+
+from pyflink.ml.core.wrapper import JavaWithParams
+from pyflink.ml.lib.regression.common import (JavaRegressionModel, JavaRegressionEstimator)
+from pyflink.ml.lib.param import (HasWeightCol, HasMaxIter, HasReg, HasLearningRate,
+ HasGlobalBatchSize, HasTol, HasFeaturesCol,
+ HasPredictionCol, HasLabelCol, HasElasticNet)
+
+
+class _LinearRegressionModelParams(
+ JavaWithParams,
+ HasFeaturesCol,
+ HasPredictionCol,
+ ABC
+):
+ """
+ Params for :class:`LinearRegressionModel`.
+ """
+
+ def __init__(self, java_params):
+ super(_LinearRegressionModelParams, self).__init__(java_params)
+
+
+class _LinearRegressionParams(
+ _LinearRegressionModelParams,
+ HasLabelCol,
+ HasWeightCol,
+ HasMaxIter,
+ HasReg,
+ HasElasticNet,
+ HasLearningRate,
+ HasGlobalBatchSize,
+ HasTol
+):
+ """
+ Params for :class:`LinearRegression`.
+ """
+
+ def __init__(self, java_params):
+ super(_LinearRegressionParams, self).__init__(java_params)
+
+
+class LinearRegressionModel(JavaRegressionModel, _LinearRegressionModelParams):
+ """
+ A Model which classifies data using the model data computed by :class:`LinearRegression`.
+ """
+
+ def __init__(self, java_model=None):
+ super(LinearRegressionModel, self).__init__(java_model)
+
+ @classmethod
+ def _java_model_package_name(cls) -> str:
+ return "linearregression"
+
+ @classmethod
+ def _java_model_class_name(cls) -> str:
+ return "LinearRegressionModel"
+
+
+class LinearRegression(JavaRegressionEstimator, _LinearRegressionParams):
+ """
+ An Estimator which implements the linear regression algorithm.
+
+ See https://en.wikipedia.org/wiki/Linear_regression.
+ """
+
+ def __init__(self):
+ super(LinearRegression, self).__init__()
+
+ @classmethod
+ def _create_model(cls, java_model) -> LinearRegressionModel:
+ return LinearRegressionModel(java_model)
+
+ @classmethod
+ def _java_estimator_package_name(cls) -> str:
+ return "linearregression"
+
+ @classmethod
+ def _java_estimator_class_name(cls) -> str:
+ return "LinearRegression"
diff --git a/flink-ml-python/pyflink/ml/lib/regression/tests/__init__.py b/flink-ml-python/pyflink/ml/lib/regression/tests/__init__.py
new file mode 100644
index 0000000..6698191
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/regression/tests/__init__.py
@@ -0,0 +1,30 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+import sys
+from pathlib import Path
+
+# Because the project and the dependent `pyflink` project have the same directory structure,
+# we need to manually add `flink-ml-python` path to `sys.path` in the test of this project to change
+# the order of package search.
+flink_ml_python_dir = Path(__file__).parents[5]
+sys.path.append(str(flink_ml_python_dir))
+
+import pyflink
+
+pyflink.__path__.insert(0, os.path.join(flink_ml_python_dir, 'pyflink'))
diff --git a/flink-ml-python/pyflink/ml/lib/regression/tests/test_linearregression.py b/flink-ml-python/pyflink/ml/lib/regression/tests/test_linearregression.py
new file mode 100644
index 0000000..4af1df3
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/regression/tests/test_linearregression.py
@@ -0,0 +1,151 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+
+from pyflink.common import Types
+from pyflink.table import Table
+
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.regression.linearregression import LinearRegression, \
+ LinearRegressionModel
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+
+
+class LinearRegressionTest(PyFlinkMLTestCase):
+ def setUp(self):
+ super(LinearRegressionTest, self).setUp()
+ self.input_data_table = self.t_env.from_data_stream(
+ self.env.from_collection([
+ (Vectors.dense(2, 1), 4., 1.),
+ (Vectors.dense(3, 2), 7., 1.),
+ (Vectors.dense(4, 3), 10., 1.),
+ (Vectors.dense(2, 4), 10., 1.),
+ (Vectors.dense(2, 2), 6., 1.),
+ (Vectors.dense(4, 3), 10., 1.),
+ (Vectors.dense(1, 2), 5., 1.),
+ (Vectors.dense(5, 3), 11., 1.),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['features', 'label', 'weight'],
+ [DenseVectorTypeInfo(), Types.DOUBLE(), Types.DOUBLE()])
+ ))
+
+ def test_param(self):
+ regression = LinearRegression()
+ self.assertEqual(regression.label_col, 'label')
+ self.assertIsNone(regression.weight_col)
+ self.assertEqual(regression.max_iter, 20)
+ self.assertAlmostEqual(regression.reg, 0, delta=1e-7)
+ self.assertAlmostEqual(regression.learning_rate, 0.1, delta=1e-7)
+ self.assertEqual(regression.global_batch_size, 32)
+ self.assertAlmostEqual(regression.tol, 1e-6, delta=1e-7)
+ self.assertEqual(regression.features_col, 'features')
+ self.assertEqual(regression.prediction_col, 'prediction')
+
+ regression.set_features_col("test_features") \
+ .set_label_col("test_label") \
+ .set_weight_col("test_weight") \
+ .set_max_iter(1000) \
+ .set_tol(0.001) \
+ .set_learning_rate(0.5) \
+ .set_global_batch_size(1000) \
+ .set_reg(0.1) \
+ .set_prediction_col("test_prediction_col") \
+
+ self.assertEqual(regression.features_col, 'test_features')
+ self.assertEqual(regression.label_col, 'test_label')
+ self.assertEqual(regression.weight_col, 'test_weight')
+ self.assertEqual(regression.max_iter, 1000)
+ self.assertAlmostEqual(regression.reg, 0.1, delta=1e-7)
+ self.assertAlmostEqual(regression.learning_rate, 0.5, delta=1e-7)
+ self.assertEqual(regression.global_batch_size, 1000)
+ self.assertAlmostEqual(regression.tol, 0.001, delta=1e-7)
+ self.assertEqual(regression.prediction_col, 'test_prediction_col')
+
+ def test_output_schema(self):
+ temp_table = self.input_data_table.alias("test_features", "test_label", "test_weight")
+ regression = LinearRegression() \
+ .set_features_col('test_features') \
+ .set_label_col('test_label') \
+ .set_weight_col('test_weight') \
+ .set_prediction_col('test_prediction_col')
+ output = regression.fit(self.input_data_table).transform(temp_table)[0]
+ self.assertEqual(
+ ['test_features',
+ 'test_label',
+ 'test_weight',
+ 'test_prediction_col'],
+ output.get_schema().get_field_names())
+
+ def test_fit_and_predict(self):
+ regression = LinearRegression().set_weight_col('weight')
+ output = regression.fit(self.input_data_table).transform(self.input_data_table)[0]
+ field_names = output.get_schema().get_field_names()
+ self.verify_predict_result(
+ output,
+ field_names.index(regression.get_label_col()),
+ field_names.index(regression.get_prediction_col()))
+
+ def test_save_load_and_predict(self):
+ regression = LinearRegression().set_weight_col('weight')
+ path = os.path.join(self.temp_dir, 'test_save_load_and_predict_linear_regression')
+ regression.save(path)
+ regression = LinearRegression.load(self.t_env, path) # type: LinearRegression
+ model = regression.fit(self.input_data_table)
+ self.assertEqual(
+ model.get_model_data()[0].get_schema().get_field_names(),
+ ['coefficient'])
+ output = model.transform(self.input_data_table)[0]
+ field_names = output.get_schema().get_field_names()
+ self.verify_predict_result(
+ output,
+ field_names.index(regression.get_label_col()),
+ field_names.index(regression.get_prediction_col()))
+
+ def test_get_model_data(self):
+ regression = LinearRegression().set_weight_col('weight')
+ model = regression.fit(self.input_data_table)
+ model_data = self.t_env.to_data_stream(
+ model.get_model_data()[0]).execute_and_collect().next()
+ self.assertIsNotNone(model_data[0])
+ data = model_data[0].values.tolist()
+ expected = [1.141, 1.829]
+ self.assertEqual(len(data), len(expected))
+ for a, b in zip(data, expected):
+ self.assertAlmostEqual(a, b, delta=0.1)
+
+ def test_set_model_data(self):
+ regression = LinearRegression().set_weight_col('weight')
+ model = regression.fit(self.input_data_table)
+
+ new_model = LinearRegressionModel()
+ new_model.set_model_data(*model.get_model_data())
+ output = new_model.transform(self.input_data_table)[0]
+ field_names = output.get_schema().get_field_names()
+ self.verify_predict_result(
+ output,
+ field_names.index(regression.get_label_col()),
+ field_names.index(regression.get_prediction_col()))
+
+ def verify_predict_result(
+ self, output: Table, label_index, prediction_index):
+ with self.t_env.to_data_stream(output).execute_and_collect() as results:
+ for result in results:
+ label = result[label_index] # type: float
+ prediction = result[prediction_index] # type: float
+ self.assertAlmostEqual(label, prediction, delta=0.1 * label)
diff --git a/flink-ml-python/pyflink/ml/lib/tests/test_ml_lib_completeness.py b/flink-ml-python/pyflink/ml/lib/tests/test_ml_lib_completeness.py
index ff17298..7bd894d 100644
--- a/flink-ml-python/pyflink/ml/lib/tests/test_ml_lib_completeness.py
+++ b/flink-ml-python/pyflink/ml/lib/tests/test_ml_lib_completeness.py
@@ -22,9 +22,8 @@ import pkgutil
import unittest
from abc import abstractmethod
-from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
-
from pyflink.java_gateway import get_gateway
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
class CompletenessTest(object):
@@ -70,7 +69,8 @@ class MLLibTest(PyFlinkMLTestCase):
'JavaClassificationEstimator', 'JavaClassificationModel',
'JavaClusteringEstimator', 'JavaClusteringModel',
'JavaEvaluationAlgoOperator', 'JavaFeatureTransformer',
- 'JavaFeatureEstimator', 'JavaFeatureModel')]
+ 'JavaFeatureEstimator', 'JavaFeatureModel',
+ 'JavaRegressionEstimator', 'JavaRegressionModel')]
@abstractmethod
def module_name(self):
@@ -120,6 +120,16 @@ class FeatureCompletenessTest(CompletenessTest, MLLibTest):
return feature
+class RegressionCompletenessTest(CompletenessTest, MLLibTest):
+
+ def module_name(self):
+ return "regression"
+
+ def module(self):
+ from pyflink.ml.lib import regression
+ return regression
+
+
if __name__ == "__main__":
try:
import xmlrunner