You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2022/07/20 09:11:47 UTC
[flink-ml] branch master updated: [FLINK-28502] Add Transformer for RegexTokenizer
This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git
The following commit(s) were added to refs/heads/master by this push:
new f9f8021 [FLINK-28502] Add Transformer for RegexTokenizer
f9f8021 is described below
commit f9f802125d604f0155221804237fd4140e239602
Author: Zhipeng Zhang <zh...@gmail.com>
AuthorDate: Wed Jul 20 17:11:43 2022 +0800
[FLINK-28502] Add Transformer for RegexTokenizer
This closes #130.
---
.../ml/examples/feature/RegexTokenizerExample.java | 64 +++++++
.../ml/feature/regextokenizer/RegexTokenizer.java | 128 ++++++++++++++
.../regextokenizer/RegexTokenizerParams.java | 78 +++++++++
.../flink/ml/feature/RegexTokenizerTest.java | 194 +++++++++++++++++++++
.../examples/ml/feature/regextokenizer_example.py | 60 +++++++
.../pyflink/ml/lib/feature/regextokenizer.py | 129 ++++++++++++++
.../ml/lib/feature/tests/test_regextokenizer.py | 93 ++++++++++
7 files changed, 746 insertions(+)
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/RegexTokenizerExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/RegexTokenizerExample.java
new file mode 100644
index 0000000..4a743bc
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/RegexTokenizerExample.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.feature;
+
+import org.apache.flink.ml.feature.regextokenizer.RegexTokenizer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+
+/** Simple program that creates a RegexTokenizer instance and uses it for feature engineering. */
+public class RegexTokenizerExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input data.
+ DataStream<Row> inputStream =
+ env.fromElements(Row.of("Test for tokenization."), Row.of("Te,st. punct"));
+ Table inputTable = tEnv.fromDataStream(inputStream).as("input");
+
+ // Creates a RegexTokenizer object and initializes its parameters.
+ RegexTokenizer regexTokenizer =
+ new RegexTokenizer()
+ .setInputCol("input")
+ .setOutputCol("output")
+ .setPattern("\\w+|\\p{Punct}");
+
+ // Uses the Tokenizer object for feature transformations.
+ Table outputTable = regexTokenizer.transform(inputTable)[0];
+
+ // Extracts and displays the results.
+ for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+ Row row = it.next();
+
+ String inputValue = (String) row.getField(regexTokenizer.getInputCol());
+ String[] outputValues = (String[]) row.getField(regexTokenizer.getOutputCol());
+
+ System.out.printf(
+ "Input Value: %s \tOutput Values: %s\n",
+ inputValue, Arrays.toString(outputValues));
+ }
+ }
+}
diff --git a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/regextokenizer/RegexTokenizer.java b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/regextokenizer/RegexTokenizer.java
new file mode 100644
index 0000000..6f5f860
--- /dev/null
+++ b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/regextokenizer/RegexTokenizer.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.regextokenizer;
+
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.table.api.Expressions;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/**
+ * A Transformer which converts the input string to lowercase and then splits it by white spaces
+ * based on regex. It provides two options to extract tokens:
+ *
+ * <ul>
+ * <li>if "gaps" is true: uses the provided pattern to split the input string.
+ * <li>else: repeatedly matches the regex (the provided pattern) with the input string.
+ * </ul>
+ *
+ * <p>Moreover, it provides parameters to filter tokens with a minimal length and converts input to
+ * lowercase. The output of each input string is an array of strings that can be empty.
+ */
+public class RegexTokenizer
+ implements Transformer<RegexTokenizer>, RegexTokenizerParams<RegexTokenizer> {
+ private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+ public RegexTokenizer() {
+ ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+ }
+
+ @Override
+ public Table[] transform(Table... inputs) {
+ Preconditions.checkArgument(inputs.length == 1);
+ Expression tokenizerUdf =
+ Expressions.call(
+ RegexTokenizerUdf.class,
+ $(getInputCol()),
+ getPattern(),
+ getGaps(),
+ getToLowercase(),
+ getMinTokenLength())
+ .as(getOutputCol());
+ Table output = inputs[0].addColumns(tokenizerUdf);
+ return new Table[] {output};
+ }
+
+ @Override
+ public void save(String path) throws IOException {
+ ReadWriteUtils.saveMetadata(this, path);
+ }
+
+ @Override
+ public Map<Param<?>, Object> getParamMap() {
+ return paramMap;
+ }
+
+ public static RegexTokenizer load(StreamTableEnvironment tEnv, String path) throws IOException {
+ return ReadWriteUtils.loadStageParam(path);
+ }
+
+ /**
+ * The main logic of ${@link RegexTokenizer}, which converts the input string to an array of
+ * tokens.
+ */
+ public static class RegexTokenizerUdf extends ScalarFunction {
+
+ public String[] eval(
+ String input,
+ String pattern,
+ Boolean gaps,
+ boolean toLowercase,
+ int minTokenLength) {
+ Pattern regPattern = Pattern.compile(pattern);
+ input = toLowercase ? input.toLowerCase() : input;
+
+ List<String> tokens = new ArrayList<>();
+ if (gaps) {
+ String[] tokenArray = regPattern.split(input);
+ for (String token : tokenArray) {
+ if (token.length() >= minTokenLength) {
+ tokens.add(token);
+ }
+ }
+ } else {
+ Matcher matcher = regPattern.matcher(input);
+ while (matcher.find()) {
+ String token = matcher.group();
+ if (token.length() >= minTokenLength) {
+ tokens.add(token);
+ }
+ }
+ }
+
+ return tokens.toArray(new String[0]);
+ }
+ }
+}
diff --git a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/regextokenizer/RegexTokenizerParams.java b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/regextokenizer/RegexTokenizerParams.java
new file mode 100644
index 0000000..029069b
--- /dev/null
+++ b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/regextokenizer/RegexTokenizerParams.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.regextokenizer;
+
+import org.apache.flink.ml.common.param.HasInputCol;
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.param.BooleanParam;
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.StringParam;
+
+/**
+ * Params for {@link RegexTokenizer}.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface RegexTokenizerParams<T> extends HasInputCol<T>, HasOutputCol<T> {
+ IntParam MIN_TOKEN_LENGTH =
+ new IntParam("minTokenLength", "Minimum token length", 1, ParamValidators.gtEq(0));
+
+ BooleanParam GAPS = new BooleanParam("gaps", "Set regex to match gaps or tokens", true);
+
+ StringParam PATTERN = new StringParam("pattern", "Regex pattern used for tokenizing", "\\s+");
+
+ BooleanParam TO_LOWERCASE =
+ new BooleanParam(
+ "toLowercase",
+ "Whether to convert all characters to lowercase before tokenizing",
+ true);
+
+ default T setMinTokenLength(int value) {
+ return set(MIN_TOKEN_LENGTH, value);
+ }
+
+ default int getMinTokenLength() {
+ return get(MIN_TOKEN_LENGTH);
+ }
+
+ default T setGaps(boolean value) {
+ return set(GAPS, value);
+ }
+
+ default Boolean getGaps() {
+ return get(GAPS);
+ }
+
+ default T setPattern(String pattern) {
+ return set(PATTERN, pattern);
+ }
+
+ default String getPattern() {
+ return get(PATTERN);
+ }
+
+ default T setToLowercase(boolean toLowercase) {
+ return set(TO_LOWERCASE, toLowercase);
+ }
+
+ default Boolean getToLowercase() {
+ return get(TO_LOWERCASE);
+ }
+}
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RegexTokenizerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RegexTokenizerTest.java
new file mode 100644
index 0000000..af5381d
--- /dev/null
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RegexTokenizerTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.regextokenizer.RegexTokenizer;
+import org.apache.flink.ml.feature.regextokenizer.RegexTokenizerParams;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Expressions;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link RegexTokenizer}. */
+public class RegexTokenizerTest extends AbstractTestBase {
+ private StreamTableEnvironment tEnv;
+ private StreamExecutionEnvironment env;
+ private Table inputDataTable;
+
+ private static final List<Row> INPUT =
+ Arrays.asList(Row.of("Test for tokenization."), Row.of("Te,st. punct"));
+
+ @Before
+ public void before() {
+ Configuration config = new Configuration();
+ config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+ env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.setParallelism(4);
+ env.enableCheckpointing(100);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ tEnv = StreamTableEnvironment.create(env);
+ DataStream<Row> dataStream = env.fromCollection(INPUT);
+ inputDataTable = tEnv.fromDataStream(dataStream).as("input");
+ }
+
+ @Test
+ public void testParam() {
+ RegexTokenizer regexTokenizer = new RegexTokenizer();
+ assertEquals("input", regexTokenizer.getInputCol());
+ assertEquals("output", regexTokenizer.getOutputCol());
+ assertEquals(1, regexTokenizer.getMinTokenLength());
+ assertEquals(true, regexTokenizer.getGaps());
+ assertEquals("\\s+", regexTokenizer.getPattern());
+ assertEquals(true, regexTokenizer.getToLowercase());
+
+ regexTokenizer
+ .setInputCol("testInputCol")
+ .setOutputCol("testOutputCol")
+ .setMinTokenLength(3)
+ .setGaps(false)
+ .setPattern("\\s")
+ .setToLowercase(false);
+
+ assertEquals("testInputCol", regexTokenizer.getInputCol());
+ assertEquals("testOutputCol", regexTokenizer.getOutputCol());
+ assertEquals(3, regexTokenizer.getMinTokenLength());
+ assertEquals(false, regexTokenizer.getGaps());
+ assertEquals("\\s", regexTokenizer.getPattern());
+ assertEquals(false, regexTokenizer.getToLowercase());
+ }
+
+ @Test
+ public void testOutputSchema() {
+ RegexTokenizer regexTokenizer = new RegexTokenizer();
+ inputDataTable =
+ tEnv.fromDataStream(env.fromElements(Row.of("", ""))).as("input", "dummyInput");
+ Table output = regexTokenizer.transform(inputDataTable)[0];
+ assertEquals(
+ Arrays.asList(
+ regexTokenizer.getInputCol(), "dummyInput", regexTokenizer.getOutputCol()),
+ output.getResolvedSchema().getColumnNames());
+ }
+
+ @Test
+ public void testTransform() throws Exception {
+ List<Row> expectedRows;
+ int minTokenLength = RegexTokenizerParams.MIN_TOKEN_LENGTH.defaultValue;
+ boolean gaps = RegexTokenizerParams.GAPS.defaultValue;
+ String pattern = RegexTokenizerParams.PATTERN.defaultValue;
+ boolean toLowercase = RegexTokenizerParams.TO_LOWERCASE.defaultValue;
+
+ // default option.
+ expectedRows =
+ Arrays.asList(
+ Row.of((Object) new String[] {"test", "for", "tokenization."}),
+ Row.of((Object) new String[] {"te,st.", "punct"}));
+ checkTransform(minTokenLength, gaps, pattern, toLowercase, expectedRows);
+
+ // default option except toLowercase = false.
+ expectedRows =
+ Arrays.asList(
+ Row.of((Object) new String[] {"Test", "for", "tokenization."}),
+ Row.of((Object) new String[] {"Te,st.", "punct"}));
+ toLowercase = false;
+ checkTransform(minTokenLength, gaps, pattern, toLowercase, expectedRows);
+
+ // default option except gaps = false, pattern = "\\w+|\\p{Punct}".
+ expectedRows =
+ Arrays.asList(
+ Row.of((Object) new String[] {"test", "for", "tokenization", "."}),
+ Row.of((Object) new String[] {"te", ",", "st", ".", "punct"}));
+ gaps = false;
+ pattern = "\\w+|\\p{Punct}";
+ toLowercase = true;
+ checkTransform(minTokenLength, gaps, pattern, toLowercase, expectedRows);
+
+ // default option except gaps = false, minTokenLength = 3, pattern = "\\w+|\\p{Punct}".
+ gaps = false;
+ minTokenLength = 3;
+ pattern = "\\w+|\\p{Punct}";
+ expectedRows =
+ Arrays.asList(
+ Row.of((Object) new String[] {"test", "for", "tokenization"}),
+ Row.of((Object) new String[] {"punct"}));
+ checkTransform(minTokenLength, gaps, pattern, toLowercase, expectedRows);
+ }
+
+ @Test
+ public void testSaveLoadAndTransform() throws Exception {
+ RegexTokenizer regexTokenizer = new RegexTokenizer();
+ List<Row> expectedRows =
+ Arrays.asList(
+ Row.of((Object) new String[] {"test", "for", "tokenization."}),
+ Row.of((Object) new String[] {"te,st.", "punct"}));
+ RegexTokenizer loadedRegexTokenizer =
+ TestUtils.saveAndReload(
+ tEnv, regexTokenizer, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+ Table output = loadedRegexTokenizer.transform(inputDataTable)[0];
+ verifyOutputResult(output, loadedRegexTokenizer.getOutputCol(), expectedRows);
+ }
+
+ private void checkTransform(
+ int minTokenLength,
+ boolean gaps,
+ String pattern,
+ boolean toLowercase,
+ List<Row> expectedOutput)
+ throws Exception {
+ RegexTokenizer regexTokenizer =
+ new RegexTokenizer()
+ .setMinTokenLength(minTokenLength)
+ .setGaps(gaps)
+ .setPattern(pattern)
+ .setToLowercase(toLowercase);
+ Table output = regexTokenizer.transform(inputDataTable)[0];
+ verifyOutputResult(output, regexTokenizer.getOutputCol(), expectedOutput);
+ }
+
+ private void verifyOutputResult(Table output, String outputCol, List<Row> expectedOutput)
+ throws Exception {
+ DataStream<Row> dataStream = tEnv.toDataStream(output.select(Expressions.$(outputCol)));
+ List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+ assertEquals(expectedOutput.size(), results.size());
+ results.sort(Comparator.comparingInt(o -> ((String[]) o.getField(0))[0].hashCode()));
+ expectedOutput.sort(Comparator.comparingInt(o -> ((String[]) o.getField(0))[0].hashCode()));
+ for (int i = 0; i < expectedOutput.size(); i++) {
+ assertArrayEquals(
+ (String[]) results.get(i).getField(0),
+ (String[]) expectedOutput.get(i).getField(0));
+ }
+ }
+}
diff --git a/flink-ml-python/pyflink/examples/ml/feature/regextokenizer_example.py b/flink-ml-python/pyflink/examples/ml/feature/regextokenizer_example.py
new file mode 100644
index 0000000..0a5b2a4
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/regextokenizer_example.py
@@ -0,0 +1,60 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that creates a VectorAssembler instance and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to set up Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.lib.feature.regextokenizer import RegexTokenizer
+from pyflink.table import StreamTableEnvironment
+
+env = StreamExecutionEnvironment.get_execution_environment()
+
+t_env = StreamTableEnvironment.create(env)
+
+# Generates input data.
+input_data_table = t_env.from_data_stream(
+ env.from_collection([
+ ('Test for tokenization.',),
+ ('Te,st. punct',),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['input'],
+ [Types.STRING()])))
+
+# Creates a RegexTokenizer object and initializes its parameters.
+regex_tokenizer = RegexTokenizer() \
+ .set_input_col("input") \
+ .set_output_col("output")
+
+# Uses the Tokenizer object for feature transformations.
+output = regex_tokenizer.transform(input_data_table)[0]
+
+# Extracts and displays the results.
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ input_value = result[field_names.index(regex_tokenizer.get_input_col())]
+ output_value = result[field_names.index(regex_tokenizer.get_output_col())]
+ print('Input Values: ' + str(input_value) + '\tOutput Value: ' + str(output_value))
diff --git a/flink-ml-python/pyflink/ml/lib/feature/regextokenizer.py b/flink-ml-python/pyflink/ml/lib/feature/regextokenizer.py
new file mode 100644
index 0000000..5f7e66a
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/feature/regextokenizer.py
@@ -0,0 +1,129 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import typing
+
+from pyflink.ml.core.param import IntParam, BooleanParam, StringParam, ParamValidators
+from pyflink.ml.core.wrapper import JavaWithParams
+from pyflink.ml.lib.feature.common import JavaFeatureTransformer
+from pyflink.ml.lib.param import HasInputCol, HasOutputCol
+
+
+class _RegexTokenizerParams(
+ JavaWithParams,
+ HasInputCol,
+ HasOutputCol
+):
+ """
+ Params for :class:`RegexTokenizer`.
+ """
+
+ MIN_TOKEN_LENGTH: IntParam = IntParam(
+ "min_token_length",
+ "Minimum token length",
+ 1,
+ ParamValidators.gt_eq(0)
+ )
+
+ GAPS: BooleanParam = BooleanParam(
+ "gaps",
+ "Set regex to match gaps or tokens",
+ True
+ )
+
+ PATTERN: StringParam = StringParam(
+ "pattern",
+ "Regex pattern used for tokenizing",
+ "\\s+"
+ )
+
+ TO_LOWERCASE: BooleanParam = BooleanParam(
+ "to_lowercase",
+ "Whether to convert all characters to lowercase before tokenizing",
+ True
+ )
+
+ def __init__(self, java_params):
+ super(_RegexTokenizerParams, self).__init__(java_params)
+
+ def set_min_token_length(self, value: int):
+ return typing.cast(_RegexTokenizerParams, self.set(self.MIN_TOKEN_LENGTH, value))
+
+ def get_min_token_length(self) -> int:
+ return self.get(self.MIN_TOKEN_LENGTH)
+
+ def set_gaps(self, value: bool):
+ return typing.cast(_RegexTokenizerParams, self.set(self.GAPS, value))
+
+ def get_gaps(self) -> bool:
+ return self.get(self.GAPS)
+
+ def set_pattern(self, value: str):
+ return typing.cast(_RegexTokenizerParams, self.set(self.PATTERN, value))
+
+ def get_pattern(self) -> str:
+ return self.get(self.PATTERN)
+
+ def set_to_lowercase(self, value: bool):
+ return typing.cast(_RegexTokenizerParams, self.set(self.TO_LOWERCASE, value))
+
+ def get_to_lowertcase(self) -> bool:
+ return self.get(self.TO_LOWERCASE)
+
+ @property
+ def min_token_length(self) -> int:
+ return self.get_min_token_length()
+
+ @property
+ def gaps(self) -> bool:
+ return self.get_gaps()
+
+ @property
+ def pattern(self) -> str:
+ return self.get_pattern()
+
+ @property
+ def to_lowercase(self):
+ return self.get_to_lowertcase()
+
+
+class RegexTokenizer(JavaFeatureTransformer, _RegexTokenizerParams):
+ """
+ A Transformer which converts the input string to lowercase and then splits it by white spaces
+ based on regex. It provides two options to extract tokens:
+
+ <ul>
+ <li>if "gaps" is true: uses the provided pattern to split the input string.
+ <li>else: repeatedly matches the regex (the provided pattern) with the input string.
+ </ul>
+
+ Moreover, it provides parameters to filter tokens with a minimal length and converts input to
+ lowercase. The output of each input string is an array of strings that can be empty.
+
+ """
+
+ def __init__(self, java_model=None):
+ super(RegexTokenizer, self).__init__(java_model)
+
+ @classmethod
+ def _java_transformer_package_name(cls) -> str:
+ return "regextokenizer"
+
+ @classmethod
+ def _java_transformer_class_name(cls) -> str:
+ return "RegexTokenizer"
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_regextokenizer.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_regextokenizer.py
new file mode 100644
index 0000000..7ef1bff
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_regextokenizer.py
@@ -0,0 +1,93 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+
+from pyflink.common import Types
+
+from pyflink.ml.lib.feature.regextokenizer import RegexTokenizer
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+
+
+class RegexTokenizerTest(PyFlinkMLTestCase):
+ def setUp(self):
+ super(RegexTokenizerTest, self).setUp()
+ self.input_data_table = self.t_env.from_data_stream(
+ self.env.from_collection([
+ ('Test for tokenization.',),
+ ('Te,st. punct',),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['input'],
+ [Types.STRING()])))
+ self.expected_output = [
+ ['test', 'for', 'tokenization.'],
+ ['te,st.', 'punct']
+ ]
+
+ def test_param(self):
+ regex_tokenizer = RegexTokenizer()
+ self.assertEqual('input', regex_tokenizer.input_col)
+ self.assertEqual('output', regex_tokenizer.output_col)
+ self.assertEqual(1, regex_tokenizer.min_token_length)
+ self.assertEqual(True, regex_tokenizer.gaps)
+ self.assertEqual('\\s+', regex_tokenizer.pattern)
+ self.assertEqual(True, regex_tokenizer.to_lowercase)
+
+ regex_tokenizer \
+ .set_input_col("testInputCol") \
+ .set_output_col("testOutputCol") \
+ .set_min_token_length(3) \
+ .set_gaps(False) \
+ .set_pattern("\\s") \
+ .set_to_lowercase(False)
+
+ self.assertEqual('testInputCol', regex_tokenizer.input_col)
+ self.assertEqual('testOutputCol', regex_tokenizer.output_col)
+ self.assertEqual(3, regex_tokenizer.min_token_length)
+ self.assertEqual(False, regex_tokenizer.gaps)
+ self.assertEqual('\\s', regex_tokenizer.pattern)
+ self.assertEqual(False, regex_tokenizer.to_lowercase)
+
+ def test_output_schema(self):
+ regex_tokenizer = RegexTokenizer()
+ input_data_table = self.t_env.from_data_stream(
+ self.env.from_collection([
+ ('', ''),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['input', 'dummy_input'],
+ [Types.STRING(), Types.STRING()])))
+ output = regex_tokenizer.transform(input_data_table)[0]
+
+ self.assertEqual(
+ [regex_tokenizer.input_col, 'dummy_input', regex_tokenizer.output_col],
+ output.get_schema().get_field_names())
+
+ def test_save_load_transform(self):
+ regex_tokenizer = RegexTokenizer()
+ path = os.path.join(self.temp_dir, 'test_save_load_transform_regextokenizer')
+ regex_tokenizer.save(path)
+ regex_tokenizer = RegexTokenizer.load(self.t_env, path)
+ output_table = regex_tokenizer.transform(self.input_data_table)[0]
+ predicted_results = [result[1] for result in
+ self.t_env.to_data_stream(output_table).execute_and_collect()]
+
+ predicted_results.sort(key=lambda x: x[0])
+ self.expected_output.sort(key=lambda x: x[0])
+ self.assertEqual(self.expected_output, predicted_results)