You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2022/07/19 13:59:40 UTC

[flink-ml] branch master updated: [FLINK-28500] Add Transformer for Tokenizer

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b97256  [FLINK-28500] Add Transformer for Tokenizer
3b97256 is described below

commit 3b97256096d2c97abd3dc0b3810663e50e746b9e
Author: Zhipeng Zhang <zh...@gmail.com>
AuthorDate: Tue Jul 19 21:59:34 2022 +0800

    [FLINK-28500] Add Transformer for Tokenizer
    
    This closes #129.
---
 .../ml/examples/feature/TokenizerExample.java      |  60 ++++++++++
 .../flink/ml/feature/bucketizer/Bucketizer.java    |   4 +-
 .../flink/ml/feature/tokenizer/Tokenizer.java      |  80 +++++++++++++
 .../ml/feature/tokenizer/TokenizerParams.java      |  29 +++++
 .../feature/vectorassembler/VectorAssembler.java   |   4 +-
 .../org/apache/flink/ml/feature/TokenizerTest.java | 124 +++++++++++++++++++++
 .../examples/ml/feature/tokenizer_example.py       |  60 ++++++++++
 .../pyflink/ml/lib/feature/bucketizer.py           |   4 +-
 .../pyflink/ml/lib/feature/tests/test_tokenizer.py |  77 +++++++++++++
 .../feature/{vectorassembler.py => tokenizer.py}   |  27 ++---
 .../pyflink/ml/lib/feature/vectorassembler.py      |   2 +-
 11 files changed, 448 insertions(+), 23 deletions(-)

diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/TokenizerExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/TokenizerExample.java
new file mode 100644
index 0000000..1132757
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/TokenizerExample.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.feature;
+
+import org.apache.flink.ml.feature.tokenizer.Tokenizer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+
+/** Simple program that creates a Tokenizer instance and uses it for feature engineering. */
+public class TokenizerExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<Row> inputStream =
+                env.fromElements(Row.of("Test for tokenization."), Row.of("Te,st. punct"));
+        Table inputTable = tEnv.fromDataStream(inputStream).as("input");
+
+        // Creates a Tokenizer object and initializes its parameters.
+        Tokenizer tokenizer = new Tokenizer().setInputCol("input").setOutputCol("output");
+
+        // Uses the Tokenizer object for feature transformations.
+        Table outputTable = tokenizer.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+
+            String inputValue = (String) row.getField(tokenizer.getInputCol());
+            String[] outputValues = (String[]) row.getField(tokenizer.getOutputCol());
+
+            System.out.printf(
+                    "Input Value: %s \tOutput Values: %s\n",
+                    inputValue, Arrays.toString(outputValues));
+        }
+    }
+}
diff --git a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java
index 1382963..c4e760b 100644
--- a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java
+++ b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java
@@ -44,8 +44,8 @@ import java.util.HashMap;
 import java.util.Map;
 
 /**
- * Bucketizer is a transformer that maps multiple columns of continuous features to multiple columns
- * of discrete features, i.e., buckets indices. The indices are in [0, numSplitsInThisColumn - 1].
+ * A Transformer that maps multiple columns of continuous features to multiple columns of discrete
+ * features, i.e., buckets indices. The indices are in [0, numSplitsInThisColumn - 1].
  *
  * <p>The `keep` option of {@link HasHandleInvalid} means that we put the invalid data in the last
  * bucket of the splits, whose index is the number of the buckets.
diff --git a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/tokenizer/Tokenizer.java b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/tokenizer/Tokenizer.java
new file mode 100644
index 0000000..8098e51
--- /dev/null
+++ b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/tokenizer/Tokenizer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.tokenizer;
+
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.table.api.Expressions;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/**
+ * A Transformer which converts the input string to lowercase and then splits it by white spaces.
+ */
+public class Tokenizer implements Transformer<Tokenizer>, TokenizerParams<Tokenizer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public Tokenizer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+
+        Expression tokenizerUdf =
+                Expressions.call(TokenizerUdf.class, $(getInputCol())).as(getOutputCol());
+        Table output = inputs[0].addColumns(tokenizerUdf);
+        return new Table[] {output};
+    }
+
+    @Override
+    public void save(String path) throws IOException {
+        ReadWriteUtils.saveMetadata(this, path);
+    }
+
+    @Override
+    public Map<Param<?>, Object> getParamMap() {
+        return paramMap;
+    }
+
+    public static Tokenizer load(StreamTableEnvironment tEnv, String path) throws IOException {
+        return ReadWriteUtils.loadStageParam(path);
+    }
+
+    /**
+     * The main logic of {@link Tokenizer}, which converts the input string to an array of tokens.
+     */
+    public static class TokenizerUdf extends ScalarFunction {
+        public String[] eval(String input) {
+            return input.toLowerCase().split("\\s");
+        }
+    }
+}
diff --git a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/tokenizer/TokenizerParams.java b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/tokenizer/TokenizerParams.java
new file mode 100644
index 0000000..3e82003
--- /dev/null
+++ b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/tokenizer/TokenizerParams.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.tokenizer;
+
+import org.apache.flink.ml.common.param.HasInputCol;
+import org.apache.flink.ml.common.param.HasOutputCol;
+
+/**
+ * Params of {@link Tokenizer}.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface TokenizerParams<T> extends HasInputCol<T>, HasOutputCol<T> {}
diff --git a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java
index c14e44e..e3a01f1 100644
--- a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java
+++ b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java
@@ -46,8 +46,8 @@ import java.util.HashMap;
 import java.util.Map;
 
 /**
- * A feature transformer that combines a given list of input columns into a vector column. Types of
- * input columns must be either vector or numerical value.
+ * A Transformer which combines a given list of input columns into a vector column. Types of input
+ * columns must be either vector or numerical value.
  *
  * <p>The `keep` option of {@link HasHandleInvalid} means that we output bad rows with output column
  * set to null.
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/TokenizerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/TokenizerTest.java
new file mode 100644
index 0000000..cacbec5
--- /dev/null
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/TokenizerTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.tokenizer.Tokenizer;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Expressions;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link Tokenizer}. */
+public class TokenizerTest extends AbstractTestBase {
+    private StreamTableEnvironment tEnv;
+    private StreamExecutionEnvironment env;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT =
+            Arrays.asList(Row.of("Test for tokenization."), Row.of("Te,st. punct"));
+
+    private static final List<Row> EXPECTED_OUTPUT =
+            Arrays.asList(
+                    Row.of((Object) new String[] {"test", "for", "tokenization."}),
+                    Row.of((Object) new String[] {"te,st.", "punct"}));
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("input");
+    }
+
+    @Test
+    public void testParam() {
+        Tokenizer tokenizer = new Tokenizer();
+        assertEquals("input", tokenizer.getInputCol());
+        assertEquals("output", tokenizer.getOutputCol());
+
+        tokenizer.setInputCol("testInputCol").setOutputCol("testOutputCol");
+        assertEquals("testInputCol", tokenizer.getInputCol());
+        assertEquals("testOutputCol", tokenizer.getOutputCol());
+    }
+
+    @Test
+    public void testOutputSchema() {
+        Tokenizer tokenizer = new Tokenizer();
+        inputDataTable =
+                tEnv.fromDataStream(env.fromElements(Row.of("", ""))).as("input", "dummyInput");
+        Table output = tokenizer.transform(inputDataTable)[0];
+        assertEquals(
+                Arrays.asList(tokenizer.getInputCol(), "dummyInput", tokenizer.getOutputCol()),
+                output.getResolvedSchema().getColumnNames());
+    }
+
+    @Test
+    public void testTransform() throws Exception {
+        Tokenizer tokenizer = new Tokenizer();
+        Table output = tokenizer.transform(inputDataTable)[0];
+        verifyOutputResult(output, tokenizer.getOutputCol(), EXPECTED_OUTPUT);
+    }
+
+    @Test
+    public void testSaveLoadAndTransform() throws Exception {
+        Tokenizer tokenizer = new Tokenizer();
+        Tokenizer loadedTokenizer =
+                TestUtils.saveAndReload(
+                        tEnv, tokenizer, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+        Table output = loadedTokenizer.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedTokenizer.getOutputCol(), EXPECTED_OUTPUT);
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, List<Row> expectedOutput)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output.select(Expressions.$(outputCol)));
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(expectedOutput.size(), results.size());
+        results.sort(Comparator.comparingInt(o -> ((String[]) o.getField(0))[0].hashCode()));
+        expectedOutput.sort(Comparator.comparingInt(o -> ((String[]) o.getField(0))[0].hashCode()));
+        for (int i = 0; i < expectedOutput.size(); i++) {
+            assertArrayEquals(
+                    (String[]) results.get(i).getField(0),
+                    (String[]) expectedOutput.get(i).getField(0));
+        }
+    }
+}
diff --git a/flink-ml-python/pyflink/examples/ml/feature/tokenizer_example.py b/flink-ml-python/pyflink/examples/ml/feature/tokenizer_example.py
new file mode 100644
index 0000000..b0f8308
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/tokenizer_example.py
@@ -0,0 +1,60 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that creates a VectorAssembler instance and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to set up Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.lib.feature.tokenizer import Tokenizer
+from pyflink.table import StreamTableEnvironment
+
+env = StreamExecutionEnvironment.get_execution_environment()
+
+t_env = StreamTableEnvironment.create(env)
+
+# Generates input data.
+input_data_table = t_env.from_data_stream(
+    env.from_collection([
+        ('Test for tokenization.',),
+        ('Te,st. punct',),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['input'],
+            [Types.STRING()])))
+
+# Creates a Tokenizer object and initializes its parameters.
+tokenizer = Tokenizer() \
+    .set_input_col("input") \
+    .set_output_col("output")
+
+# Uses the Tokenizer object for feature transformations.
+output = tokenizer.transform(input_data_table)[0]
+
+# Extracts and displays the results.
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+    input_value = result[field_names.index(tokenizer.get_input_col())]
+    output_value = result[field_names.index(tokenizer.get_output_col())]
+    print('Input Values: ' + str(input_value) + '\tOutput Value: ' + str(output_value))
diff --git a/flink-ml-python/pyflink/ml/lib/feature/bucketizer.py b/flink-ml-python/pyflink/ml/lib/feature/bucketizer.py
index 26a6b0b..103dad3 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/bucketizer.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/bucketizer.py
@@ -31,7 +31,7 @@ class _BucketizerParams(
     HasHandleInvalid
 ):
     """
-    Params for :class:`StandardScaler`.
+    Params for :class:`Bucketizer`.
     """
 
     SPLITS_ARRAY: Param[Tuple[float, ...]] = FloatArrayArrayParam(
@@ -55,7 +55,7 @@ class _BucketizerParams(
 
 class Bucketizer(JavaFeatureTransformer, _BucketizerParams):
     """
-    Bucketizer is a transformer that maps multiple columns of continuous features to multiple
+    A Transformer that maps multiple columns of continuous features to multiple
     columns of discrete features, i.e., buckets indices. The indices are in
     [0, numSplitsInThisColumn - 1].
 
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_tokenizer.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_tokenizer.py
new file mode 100644
index 0000000..1534f98
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_tokenizer.py
@@ -0,0 +1,77 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+
+from pyflink.common import Types
+
+from pyflink.ml.lib.feature.tokenizer import Tokenizer
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+
+
+class TokenizerTest(PyFlinkMLTestCase):
+    def setUp(self):
+        super(TokenizerTest, self).setUp()
+        self.input_data_table = self.t_env.from_data_stream(
+            self.env.from_collection([
+                ('Test for tokenization.',),
+                ('Te,st. punct',),
+            ],
+                type_info=Types.ROW_NAMED(
+                    ['input'],
+                    [Types.STRING()])))
+        self.expected_output = [
+            ['test', 'for', 'tokenization.'],
+            ['te,st.', 'punct']
+        ]
+
+    def test_param(self):
+        tokenizer = Tokenizer()
+        self.assertEqual('input', tokenizer.input_col)
+        self.assertEqual('output', tokenizer.output_col)
+
+        tokenizer.set_input_col('testInputCol').set_output_col('testOutputCol')
+        self.assertEqual('testInputCol', tokenizer.input_col)
+        self.assertEqual('testOutputCol', tokenizer.output_col)
+
+    def test_output_schema(self):
+        tokenizer = Tokenizer()
+        input_data_table = self.t_env.from_data_stream(
+            self.env.from_collection([
+                ('', ''),
+            ],
+                type_info=Types.ROW_NAMED(
+                    ['input', 'dummy_input'],
+                    [Types.STRING(), Types.STRING()])))
+        output = tokenizer.transform(input_data_table)[0]
+
+        self.assertEqual(
+            [tokenizer.input_col, 'dummy_input', tokenizer.output_col],
+            output.get_schema().get_field_names())
+
+    def test_save_load_transform(self):
+        tokenizer = Tokenizer()
+        path = os.path.join(self.temp_dir, 'test_save_load_transform_tokenizer')
+        tokenizer.save(path)
+        tokenizer = Tokenizer.load(self.t_env, path)
+        output_table = tokenizer.transform(self.input_data_table)[0]
+        predicted_results = [result[1] for result in
+                             self.t_env.to_data_stream(output_table).execute_and_collect()]
+
+        predicted_results.sort(key=lambda x: x[0])
+        self.expected_output.sort(key=lambda x: x[0])
+        self.assertEqual(self.expected_output, predicted_results)
diff --git a/flink-ml-python/pyflink/ml/lib/feature/vectorassembler.py b/flink-ml-python/pyflink/ml/lib/feature/tokenizer.py
similarity index 64%
copy from flink-ml-python/pyflink/ml/lib/feature/vectorassembler.py
copy to flink-ml-python/pyflink/ml/lib/feature/tokenizer.py
index cff85ab..ca13ce9 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/vectorassembler.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/tokenizer.py
@@ -18,39 +18,34 @@
 
 from pyflink.ml.core.wrapper import JavaWithParams
 from pyflink.ml.lib.feature.common import JavaFeatureTransformer
-from pyflink.ml.lib.param import HasInputCols, HasOutputCol, HasHandleInvalid
+from pyflink.ml.lib.param import HasInputCol, HasOutputCol
 
 
-class _VectorAssemblerParams(
+class _TokenizerParams(
     JavaWithParams,
-    HasInputCols,
-    HasOutputCol,
-    HasHandleInvalid
+    HasInputCol,
+    HasOutputCol
 ):
     """
-    Params for :class:`VectorAssembler`.
+    Params for :class:`Tokenizer`.
     """
 
     def __init__(self, java_params):
-        super(_VectorAssemblerParams, self).__init__(java_params)
+        super(_TokenizerParams, self).__init__(java_params)
 
 
-class VectorAssembler(JavaFeatureTransformer, _VectorAssemblerParams):
+class Tokenizer(JavaFeatureTransformer, _TokenizerParams):
     """
-    A feature transformer that combines a given list of input columns into a vector column. Types of
-    input columns must be either vector or numerical value.
-
-    The `keep` option of :class:HasHandleInvalid means that we output bad rows with output column
-    set to null.
+    A Transformer that converts the input string to lowercase and then splits it by white spaces.
     """
 
     def __init__(self, java_model=None):
-        super(VectorAssembler, self).__init__(java_model)
+        super(Tokenizer, self).__init__(java_model)
 
     @classmethod
     def _java_transformer_package_name(cls) -> str:
-        return "vectorassembler"
+        return "tokenizer"
 
     @classmethod
     def _java_transformer_class_name(cls) -> str:
-        return "VectorAssembler"
+        return "Tokenizer"
diff --git a/flink-ml-python/pyflink/ml/lib/feature/vectorassembler.py b/flink-ml-python/pyflink/ml/lib/feature/vectorassembler.py
index cff85ab..08bd689 100644
--- a/flink-ml-python/pyflink/ml/lib/feature/vectorassembler.py
+++ b/flink-ml-python/pyflink/ml/lib/feature/vectorassembler.py
@@ -37,7 +37,7 @@ class _VectorAssemblerParams(
 
 class VectorAssembler(JavaFeatureTransformer, _VectorAssemblerParams):
     """
-    A feature transformer that combines a given list of input columns into a vector column. Types of
+    A Transformer which combines a given list of input columns into a vector column. Types of
     input columns must be either vector or numerical value.
 
     The `keep` option of :class:HasHandleInvalid means that we output bad rows with output column