You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2022/07/19 13:59:40 UTC
[flink-ml] branch master updated: [FLINK-28500] Add Transformer for Tokenizer
This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git
The following commit(s) were added to refs/heads/master by this push:
new 3b97256 [FLINK-28500] Add Transformer for Tokenizer
3b97256 is described below
commit 3b97256096d2c97abd3dc0b3810663e50e746b9e
Author: Zhipeng Zhang <zh...@gmail.com>
AuthorDate: Tue Jul 19 21:59:34 2022 +0800
[FLINK-28500] Add Transformer for Tokenizer
This closes #129.
---
.../ml/examples/feature/TokenizerExample.java | 60 ++++++++++
.../flink/ml/feature/bucketizer/Bucketizer.java | 4 +-
.../flink/ml/feature/tokenizer/Tokenizer.java | 80 +++++++++++++
.../ml/feature/tokenizer/TokenizerParams.java | 29 +++++
.../feature/vectorassembler/VectorAssembler.java | 4 +-
.../org/apache/flink/ml/feature/TokenizerTest.java | 124 +++++++++++++++++++++
.../examples/ml/feature/tokenizer_example.py | 60 ++++++++++
.../pyflink/ml/lib/feature/bucketizer.py | 4 +-
.../pyflink/ml/lib/feature/tests/test_tokenizer.py | 77 +++++++++++++
.../feature/{vectorassembler.py => tokenizer.py} | 27 ++---
.../pyflink/ml/lib/feature/vectorassembler.py | 2 +-
11 files changed, 448 insertions(+), 23 deletions(-)
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/TokenizerExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/TokenizerExample.java
new file mode 100644
index 0000000..1132757
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/TokenizerExample.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.feature;
+
+import org.apache.flink.ml.feature.tokenizer.Tokenizer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+
+/** Simple program that creates a Tokenizer instance and uses it for feature engineering. */
+public class TokenizerExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input data.
+ DataStream<Row> inputStream =
+ env.fromElements(Row.of("Test for tokenization."), Row.of("Te,st. punct"));
+ Table inputTable = tEnv.fromDataStream(inputStream).as("input");
+
+ // Creates a Tokenizer object and initializes its parameters.
+ Tokenizer tokenizer = new Tokenizer().setInputCol("input").setOutputCol("output");
+
+ // Uses the Tokenizer object for feature transformations.
+ Table outputTable = tokenizer.transform(inputTable)[0];
+
+ // Extracts and displays the results.
+ for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+ Row row = it.next();
+
+ String inputValue = (String) row.getField(tokenizer.getInputCol());
+ String[] outputValues = (String[]) row.getField(tokenizer.getOutputCol());
+
+ System.out.printf(
+ "Input Value: %s \tOutput Values: %s\n",
+ inputValue, Arrays.toString(outputValues));
+ }
+ }
+}
diff --git a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java
index 1382963..c4e760b 100644
--- a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java
+++ b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java
@@ -44,8 +44,8 @@ import java.util.HashMap;
import java.util.Map;
/**
- * Bucketizer is a transformer that maps multiple columns of continuous features to multiple columns
- * of discrete features, i.e., buckets indices. The indices are in [0, numSplitsInThisColumn - 1].
+ * A Transformer that maps multiple columns of continuous features to multiple columns of discrete
+ * features, i.e., buckets indices. The indices are in [0, numSplitsInThisColumn - 1].
*
* <p>The `keep` option of {@link HasHandleInvalid} means that we put the invalid data in the last
* bucket of the splits, whose index is the number of the buckets.
diff --git a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/tokenizer/Tokenizer.java b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/tokenizer/Tokenizer.java
new file mode 100644
index 0000000..8098e51
--- /dev/null
+++ b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/tokenizer/Tokenizer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.tokenizer;
+
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.table.api.Expressions;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/**
+ * A Transformer which converts the input string to lowercase and then splits it by white spaces.
+ */
+public class Tokenizer implements Transformer<Tokenizer>, TokenizerParams<Tokenizer> {
+ private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+ public Tokenizer() {
+ ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+ }
+
+ @Override
+ public Table[] transform(Table... inputs) {
+ Preconditions.checkArgument(inputs.length == 1);
+
+ Expression tokenizerUdf =
+ Expressions.call(TokenizerUdf.class, $(getInputCol())).as(getOutputCol());
+ Table output = inputs[0].addColumns(tokenizerUdf);
+ return new Table[] {output};
+ }
+
+ @Override
+ public void save(String path) throws IOException {
+ ReadWriteUtils.saveMetadata(this, path);
+ }
+
+ @Override
+ public Map<Param<?>, Object> getParamMap() {
+ return paramMap;
+ }
+
+ public static Tokenizer load(StreamTableEnvironment tEnv, String path) throws IOException {
+ return ReadWriteUtils.loadStageParam(path);
+ }
+
+ /**
+ * The main logic of {@link Tokenizer}, which converts the input string to an array of tokens.
+ */
+ public static class TokenizerUdf extends ScalarFunction {
+ public String[] eval(String input) {
+ return input.toLowerCase().split("\\s");
+ }
+ }
+}
diff --git a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/tokenizer/TokenizerParams.java b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/tokenizer/TokenizerParams.java
new file mode 100644
index 0000000..3e82003
--- /dev/null
+++ b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/tokenizer/TokenizerParams.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.tokenizer;
+
+import org.apache.flink.ml.common.param.HasInputCol;
+import org.apache.flink.ml.common.param.HasOutputCol;
+
+/**
+ * Params of {@link Tokenizer}.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface TokenizerParams<T> extends HasInputCol<T>, HasOutputCol<T> {}
diff --git a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java
index c14e44e..e3a01f1 100644
--- a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java
+++ b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java
@@ -46,8 +46,8 @@ import java.util.HashMap;
import java.util.Map;
/**
- * A feature transformer that combines a given list of input columns into a vector column. Types of
- * input columns must be either vector or numerical value.
+ * A Transformer which combines a given list of input columns into a vector column. Types of input
+ * columns must be either vector or numerical value.
*
* <p>The `keep` option of {@link HasHandleInvalid} means that we output bad rows with output column
* set to null.
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/TokenizerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/TokenizerTest.java
new file mode 100644
index 0000000..cacbec5
--- /dev/null
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/TokenizerTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.tokenizer.Tokenizer;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Expressions;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link Tokenizer}. */
+public class TokenizerTest extends AbstractTestBase {
+ private StreamTableEnvironment tEnv;
+ private StreamExecutionEnvironment env;
+ private Table inputDataTable;
+
+ private static final List<Row> INPUT =
+ Arrays.asList(Row.of("Test for tokenization."), Row.of("Te,st. punct"));
+
+ private static final List<Row> EXPECTED_OUTPUT =
+ Arrays.asList(
+ Row.of((Object) new String[] {"test", "for", "tokenization."}),
+ Row.of((Object) new String[] {"te,st.", "punct"}));
+
+ @Before
+ public void before() {
+ Configuration config = new Configuration();
+ config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+ env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.setParallelism(4);
+ env.enableCheckpointing(100);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ tEnv = StreamTableEnvironment.create(env);
+ DataStream<Row> dataStream = env.fromCollection(INPUT);
+ inputDataTable = tEnv.fromDataStream(dataStream).as("input");
+ }
+
+ @Test
+ public void testParam() {
+ Tokenizer tokenizer = new Tokenizer();
+ assertEquals("input", tokenizer.getInputCol());
+ assertEquals("output", tokenizer.getOutputCol());
+
+ tokenizer.setInputCol("testInputCol").setOutputCol("testOutputCol");
+ assertEquals("testInputCol", tokenizer.getInputCol());
+ assertEquals("testOutputCol", tokenizer.getOutputCol());
+ }
+
+ @Test
+ public void testOutputSchema() {
+ Tokenizer tokenizer = new Tokenizer();
+ inputDataTable =
+ tEnv.fromDataStream(env.fromElements(Row.of("", ""))).as("input", "dummyInput");
+ Table output = tokenizer.transform(inputDataTable)[0];
+ assertEquals(
+ Arrays.asList(tokenizer.getInputCol(), "dummyInput", tokenizer.getOutputCol()),
+ output.getResolvedSchema().getColumnNames());
+ }
+
+ @Test
+ public void testTransform() throws Exception {
+ Tokenizer tokenizer = new Tokenizer();
+ Table output = tokenizer.transform(inputDataTable)[0];
+ verifyOutputResult(output, tokenizer.getOutputCol(), EXPECTED_OUTPUT);
+ }
+
+ @Test
+ public void testSaveLoadAndTransform() throws Exception {
+ Tokenizer tokenizer = new Tokenizer();
+ Tokenizer loadedTokenizer =
+ TestUtils.saveAndReload(
+ tEnv, tokenizer, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+ Table output = loadedTokenizer.transform(inputDataTable)[0];
+ verifyOutputResult(output, loadedTokenizer.getOutputCol(), EXPECTED_OUTPUT);
+ }
+
+ private void verifyOutputResult(Table output, String outputCol, List<Row> expectedOutput)
+ throws Exception {
+ DataStream<Row> dataStream = tEnv.toDataStream(output.select(Expressions.$(outputCol)));
+ List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+ assertEquals(expectedOutput.size(), results.size());
+ results.sort(Comparator.comparingInt(o -> ((String[]) o.getField(0))[0].hashCode()));
+ expectedOutput.sort(Comparator.comparingInt(o -> ((String[]) o.getField(0))[0].hashCode()));
+ for (int i = 0; i < expectedOutput.size(); i++) {
+ assertArrayEquals(
+ (String[]) results.get(i).getField(0),
+ (String[]) expectedOutput.get(i).getField(0));
+ }
+ }
+}
diff --git a/flink-ml-python/pyflink/examples/ml/feature/tokenizer_example.py b/flink-ml-python/pyflink/examples/ml/feature/tokenizer_example.py
new file mode 100644
index 0000000..b0f8308
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/tokenizer_example.py
@@ -0,0 +1,60 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that creates a VectorAssembler instance and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to set up Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.lib.feature.tokenizer import Tokenizer
+from pyflink.table import StreamTableEnvironment
+
+env = StreamExecutionEnvironment.get_execution_environment()
+
+t_env = StreamTableEnvironment.create(env)
+
+# Generates input data.
+input_data_table = t_env.from_data_stream(
+ env.from_collection([
+ ('Test for tokenization.',),
+ ('Te,st. punct',),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['input'],
+ [Types.STRING()])))
+
+# Creates a Tokenizer object and initializes its parameters.
+tokenizer = Tokenizer() \
+ .set_input_col("input") \
+ .set_output_col("output")
+
+# Uses the Tokenizer object for feature transformations.
+output = tokenizer.transform(input_data_table)[0]
+
+# Extracts and displays the results.
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ input_value = result[field_names.index(tokenizer.get_input_col())]
+ output_value = result[field_names.index(tokenizer.get_output_col())]
+ print('Input Values: ' + str(input_value) + '\tOutput Value: ' + str(output_value))
diff --git a/flink-ml-python/pyflink/ml/lib/feature/bucketizer.py b/flink-ml-python/pyflink/ml/lib/feature/bucketizer.py
index 26a6b0b..103dad3 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/bucketizer.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/bucketizer.py
@@ -31,7 +31,7 @@ class _BucketizerParams(
HasHandleInvalid
):
"""
- Params for :class:`StandardScaler`.
+ Params for :class:`Bucketizer`.
"""
SPLITS_ARRAY: Param[Tuple[float, ...]] = FloatArrayArrayParam(
@@ -55,7 +55,7 @@ class _BucketizerParams(
class Bucketizer(JavaFeatureTransformer, _BucketizerParams):
"""
- Bucketizer is a transformer that maps multiple columns of continuous features to multiple
+ A Transformer that maps multiple columns of continuous features to multiple
columns of discrete features, i.e., buckets indices. The indices are in
[0, numSplitsInThisColumn - 1].
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_tokenizer.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_tokenizer.py
new file mode 100644
index 0000000..1534f98
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_tokenizer.py
@@ -0,0 +1,77 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+
+from pyflink.common import Types
+
+from pyflink.ml.lib.feature.tokenizer import Tokenizer
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+
+
+class TokenizerTest(PyFlinkMLTestCase):
+ def setUp(self):
+ super(TokenizerTest, self).setUp()
+ self.input_data_table = self.t_env.from_data_stream(
+ self.env.from_collection([
+ ('Test for tokenization.',),
+ ('Te,st. punct',),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['input'],
+ [Types.STRING()])))
+ self.expected_output = [
+ ['test', 'for', 'tokenization.'],
+ ['te,st.', 'punct']
+ ]
+
+ def test_param(self):
+ tokenizer = Tokenizer()
+ self.assertEqual('input', tokenizer.input_col)
+ self.assertEqual('output', tokenizer.output_col)
+
+ tokenizer.set_input_col('testInputCol').set_output_col('testOutputCol')
+ self.assertEqual('testInputCol', tokenizer.input_col)
+ self.assertEqual('testOutputCol', tokenizer.output_col)
+
+ def test_output_schema(self):
+ tokenizer = Tokenizer()
+ input_data_table = self.t_env.from_data_stream(
+ self.env.from_collection([
+ ('', ''),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['input', 'dummy_input'],
+ [Types.STRING(), Types.STRING()])))
+ output = tokenizer.transform(input_data_table)[0]
+
+ self.assertEqual(
+ [tokenizer.input_col, 'dummy_input', tokenizer.output_col],
+ output.get_schema().get_field_names())
+
+ def test_save_load_transform(self):
+ tokenizer = Tokenizer()
+ path = os.path.join(self.temp_dir, 'test_save_load_transform_tokenizer')
+ tokenizer.save(path)
+ tokenizer = Tokenizer.load(self.t_env, path)
+ output_table = tokenizer.transform(self.input_data_table)[0]
+ predicted_results = [result[1] for result in
+ self.t_env.to_data_stream(output_table).execute_and_collect()]
+
+ predicted_results.sort(key=lambda x: x[0])
+ self.expected_output.sort(key=lambda x: x[0])
+ self.assertEqual(self.expected_output, predicted_results)
diff --git a/flink-ml-python/pyflink/ml/lib/feature/vectorassembler.py b/flink-ml-python/pyflink/ml/lib/feature/tokenizer.py
similarity index 64%
copy from flink-ml-python/pyflink/ml/lib/feature/vectorassembler.py
copy to flink-ml-python/pyflink/ml/lib/feature/tokenizer.py
index cff85ab..ca13ce9 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/vectorassembler.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tokenizer.py
@@ -18,39 +18,34 @@
from pyflink.ml.core.wrapper import JavaWithParams
from pyflink.ml.lib.feature.common import JavaFeatureTransformer
-from pyflink.ml.lib.param import HasInputCols, HasOutputCol, HasHandleInvalid
+from pyflink.ml.lib.param import HasInputCol, HasOutputCol
-class _VectorAssemblerParams(
+class _TokenizerParams(
JavaWithParams,
- HasInputCols,
- HasOutputCol,
- HasHandleInvalid
+ HasInputCol,
+ HasOutputCol
):
"""
- Params for :class:`VectorAssembler`.
+ Params for :class:`Tokenizer`.
"""
def __init__(self, java_params):
- super(_VectorAssemblerParams, self).__init__(java_params)
+ super(_TokenizerParams, self).__init__(java_params)
-class VectorAssembler(JavaFeatureTransformer, _VectorAssemblerParams):
+class Tokenizer(JavaFeatureTransformer, _TokenizerParams):
"""
- A feature transformer that combines a given list of input columns into a vector column. Types of
- input columns must be either vector or numerical value.
-
- The `keep` option of :class:HasHandleInvalid means that we output bad rows with output column
- set to null.
+ A Transformer that converts the input string to lowercase and then splits it by white spaces.
"""
def __init__(self, java_model=None):
- super(VectorAssembler, self).__init__(java_model)
+ super(Tokenizer, self).__init__(java_model)
@classmethod
def _java_transformer_package_name(cls) -> str:
- return "vectorassembler"
+ return "tokenizer"
@classmethod
def _java_transformer_class_name(cls) -> str:
- return "VectorAssembler"
+ return "Tokenizer"
diff --git a/flink-ml-python/pyflink/ml/lib/feature/vectorassembler.py b/flink-ml-python/pyflink/ml/lib/feature/vectorassembler.py
index cff85ab..08bd689 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/vectorassembler.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/vectorassembler.py
@@ -37,7 +37,7 @@ class _VectorAssemblerParams(
class VectorAssembler(JavaFeatureTransformer, _VectorAssemblerParams):
"""
- A feature transformer that combines a given list of input columns into a vector column. Types of
+ A Transformer which combines a given list of input columns into a vector column. Types of
input columns must be either vector or numerical value.
The `keep` option of :class:HasHandleInvalid means that we output bad rows with output column