You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/30 09:02:03 UTC

[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #183: [FLINK-29603] Add Transformer for StopWordsRemover

jiangxin369 commented on code in PR #183:
URL: https://github.com/apache/flink-ml/pull/183#discussion_r1035581618


##########
flink-ml-lib/src/main/resources/org/apache/flink/ml/feature/stopwords/README:
##########
@@ -0,0 +1,12 @@
+Stopwords Corpus
+
+This corpus contains lists of stop words for several languages.  These

Review Comment:
   ```suggestion
   This corpus contains lists of stop words for several languages. These
   ```



##########
flink-ml-python/pyflink/ml/lib/feature/stopwordsremover.py:
##########
@@ -0,0 +1,128 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import typing
+from typing import Tuple
+
+from pyflink.java_gateway import get_gateway
+from pyflink.ml.core.param import Param, StringArrayParam, BooleanParam, StringParam
+from pyflink.ml.core.wrapper import JavaWithParams, _to_java_reference
+from pyflink.ml.lib.feature.common import JavaFeatureTransformer
+from pyflink.ml.lib.param import HasInputCols, HasOutputCols
+
+
+class _StopWordsRemoverParams(
+    JavaWithParams,
+    HasInputCols,
+    HasOutputCols
+):
+    """
+    Params for :class:`StopWordsRemover`.
+    """
+
+    STOP_WORDS: Param[Tuple[str, ...]] = StringArrayParam(
+        "stop_words",
+        "The words to be filtered out.",
+        get_gateway().jvm.org.apache.flink.ml.feature.
+        stopwordsremover.StopWordsRemoverParams.STOP_WORDS.defaultValue)
+
+    CASE_SENSITIVE: Param[bool] = BooleanParam(
+        "case_sensitive",
+        "Whether to do a case-sensitive comparison over the stop words.",
+        False
+    )
+
+    LOCALE: Param[str] = StringParam(
+        "locale",
+        "Locale of the input for case insensitive matching. Ignored when caseSensitive is true.",
+        get_gateway().jvm.org.apache.flink.ml.feature.
+        stopwordsremover.StopWordsRemoverParams.LOCALE.defaultValue)
+
+    def __init__(self, java_params):
+        super(_StopWordsRemoverParams, self).__init__(java_params)
+
+    def set_stop_words(self, value: Tuple[str, ...]):
+        return typing.cast(_StopWordsRemoverParams, self.set(self.STOP_WORDS, value))
+
+    def set_case_sensitive(self, value: bool):
+        return typing.cast(_StopWordsRemoverParams, self.set(self.CASE_SENSITIVE, value))
+
+    def set_locale(self, value: str):
+        return typing.cast(_StopWordsRemoverParams, self.set(self.LOCALE, value))
+
+    def get_stop_words(self) -> Tuple[str, ...]:
+        return self.get(self.STOP_WORDS)
+
+    def get_case_sensitive(self) -> bool:
+        return self.get(self.CASE_SENSITIVE)
+
+    def get_locale(self) -> str:
+        return self.get(self.LOCALE)
+
+    @property
+    def stop_words(self):
+        return self.get_stop_words()
+
+    @property
+    def case_sensitive(self):
+        return self.get_case_sensitive()
+
+    @property
+    def locale(self):
+        return self.get_locale()
+
+
+class StopWordsRemover(JavaFeatureTransformer, _StopWordsRemoverParams):
+    """
+    A feature transformer that filters out stop words from input.
+
+    Note: null values from input array are preserved unless adding null to stopWords explicitly.
+
+    See http://en.wikipedia.org/wiki/Stop_words
+    """
+
+    supported_languages = {
+        "danish",
+        "dutch",
+        "english",
+        "finnish",
+        "french",
+        "german",
+        "hungarian",
+        "italian",
+        "norwegian",
+        "portuguese",
+        "russian",
+        "spanish",
+        "swedish",
+        "turkish"
+    }
+
+    def __init__(self, java_model=None):
+        super(StopWordsRemover, self).__init__(java_model)
+
+    @classmethod
+    def _java_transformer_package_name(cls) -> str:
+        return "stopwordsremover"
+
+    @classmethod
+    def _java_transformer_class_name(cls) -> str:
+        return "StopWordsRemover"
+
+    @classmethod
+    def load_default_stop_words(cls, language: str):

Review Comment:
   Could you add a test for this public API?



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/stopwordsremover/StopWordsRemover.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.stopwordsremover;
+
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
+
+/**
+ * A feature transformer that filters out stop words from input.
+ *
+ * <p>Note: null values from input array are preserved unless adding null to stopWords explicitly.
+ *
+ * @see <a href="http://en.wikipedia.org/wiki/Stop_words">Stop words (Wikipedia)</a>
+ */
+public class StopWordsRemover extends StopWordsRemoverBase
+        implements Transformer<StopWordsRemover>, StopWordsRemoverParams<StopWordsRemover> {
+
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public StopWordsRemover() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        Preconditions.checkArgument(getInputCols().length == getOutputCols().length);
+
+        String[] inputCols = getInputCols();
+        String[] outputCols = getOutputCols();
+
+        ScalarFunction function =
+                new RemoveStopWordsFunction(
+                        new HashSet<>(Arrays.asList(getStopWords())),
+                        new Locale(getLocale()),
+                        getCaseSensitive());
+
+        Expression[] expressions = new Expression[inputCols.length + 1];
+        expressions[0] = $("*");
+        for (int i = 0; i < inputCols.length; i++) {
+            expressions[i + 1] = call(function, $(inputCols[i])).as(outputCols[i]);
+        }
+
+        Table output = inputs[0].select(expressions);
+
+        return new Table[] {output};
+    }
+
+    @Override
+    public void save(String path) throws IOException {
+        ReadWriteUtils.saveMetadata(this, path);
+    }
+
+    public static StopWordsRemover load(StreamTableEnvironment env, String path)
+            throws IOException {
+        return ReadWriteUtils.loadStageParam(path);
+    }
+
+    @Override
+    public Map<Param<?>, Object> getParamMap() {
+        return paramMap;
+    }
+
+    /** A Scalar Function that removes stop words from input string array. */
+    public static class RemoveStopWordsFunction extends ScalarFunction {
+        private final Set<String> stopWords;
+        private final Locale locale;
+        private final boolean caseSensitive;
+        private transient Predicate<String> predicate;
+
+        public RemoveStopWordsFunction(
+                Set<String> stopWords, Locale locale, boolean caseSensitive) {
+            this.locale = locale;
+            this.caseSensitive = caseSensitive;
+            if (caseSensitive) {
+                this.stopWords = stopWords;
+            } else {
+                this.stopWords =
+                        stopWords.stream()
+                                .map(x -> x.toLowerCase(locale))

Review Comment:
   According to Javadoc, it is permitted to add `null` to stopWords, if so, the NPE would be thrown here. Also, could you add a unit test to check the result with `null` in `stopWords`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org