You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/10/27 10:08:15 UTC

[GitHub] [flink-ml] jiangxin369 opened a new pull request, #166: [FLINK-29598] Add Estimator and Transformer for Imputer

jiangxin369 opened a new pull request, #166:
URL: https://github.com/apache/flink-ml/pull/166

   <!--
   *Thank you very much for contributing to Apache Flink ML - we are happy that you want to help us improve Flink ML. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to one [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] Title of the pull request`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   Add Estimator and Transformer for Imputer
   
   ## Brief change log
   
   *(for example:)*
     - Adds Transformer and Estimator implementation of Imputer in Java and Python.
     - Adds examples of Imputer.
     - Add documentations of Imputer .
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (docs / JavaDocs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #166: [FLINK-29598] Add Estimator and Transformer for Imputer

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #166:
URL: https://github.com/apache/flink-ml/pull/166#discussion_r1007561913


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/imputer/ImputerParams.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.imputer;
+
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.StringParam;
+
+/**
+ * Params of {@link Imputer}.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface ImputerParams<T> extends ImputerModelParams<T> {

Review Comment:
   Spark's `Imputer` also has a `relativeError` parameter. Should we also add this parameter to Flink ML?



##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/DoubleParam.java:
##########
@@ -32,4 +34,12 @@ public DoubleParam(
     public DoubleParam(String name, String description, Double defaultValue) {
         this(name, description, defaultValue, ParamValidators.alwaysTrue());
     }
+
+    @Override
+    public Double jsonDecode(Object json) throws IOException {
+        if (json instanceof String && json.equals(String.valueOf(Double.NaN))) {
+            return Double.NaN;
+        }
+        return (Double) json;

Review Comment:
   How about the following implementation?
   ```java
   if (json instanceof String) {
       return Double.valueOf((String) json);
   }
   return (Double) json;
   ```
   This applies to `Double.NaN` as well as other special values.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/imputer/Imputer.java:
##########
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.imputer;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.common.util.QuantileSummary;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The Imputer estimator completes missing values in a dataset. Missing values can be imputed using

Review Comment:
   nit: it might be better to use "bounded stream" instead of "dataset", as dataset has a specific meaning in flink.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/imputer/ImputerModel.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.imputer;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/** A Model which replace the missing values using the model data computed by {@link Imputer}. */
+public class ImputerModel implements Model<ImputerModel>, ImputerModelParams<ImputerModel> {
+
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+    private Table modelDataTable;
+
+    public ImputerModel() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public ImputerModel setModelData(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        modelDataTable = inputs[0];
+        return this;
+    }
+
+    @Override
+    public Table[] getModelData() {
+        return new Table[] {modelDataTable};
+    }
+
+    @Override
+    public void save(String path) throws IOException {
+        ReadWriteUtils.saveMetadata(this, path);
+        ReadWriteUtils.saveModelData(
+                ImputerModelData.getModelDataStream(modelDataTable),
+                path,
+                new ImputerModelData.ModelDataEncoder());
+    }
+
+    public static ImputerModel load(StreamTableEnvironment tEnv, String path) throws IOException {
+        ImputerModel model = ReadWriteUtils.loadStageParam(path);
+        Table modelDataTable =
+                ReadWriteUtils.loadModelData(tEnv, path, new ImputerModelData.ModelDataDecoder());
+        return model.setModelData(modelDataTable);
+    }
+
+    @Override
+    public Map<Param<?>, Object> getParamMap() {
+        return paramMap;
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        String[] inputCols = getInputCols();
+        String[] outputCols = getOutputCols();
+        Preconditions.checkArgument(inputCols.length == outputCols.length);
+
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        DataStream<Row> dataStream = tEnv.toDataStream(inputs[0]);
+        DataStream<ImputerModelData> imputerModel =
+                ImputerModelData.getModelDataStream(modelDataTable);
+
+        final String broadcastModelKey = "broadcastModelKey";
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        TypeInformation<?>[] outputTypes = new TypeInformation[outputCols.length];
+        Arrays.fill(outputTypes, BasicTypeInfo.DOUBLE_TYPE_INFO);
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), outputTypes),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), outputCols));
+
+        DataStream<Row> output =
+                BroadcastUtils.withBroadcastStream(
+                        Collections.singletonList(dataStream),
+                        Collections.singletonMap(broadcastModelKey, imputerModel),
+                        inputList -> {
+                            DataStream input = inputList.get(0);
+                            return input.map(
+                                    new PredictOutputFunction(
+                                            getMissingValue(), inputCols, broadcastModelKey),
+                                    outputTypeInfo);
+                        });
+        return new Table[] {tEnv.fromDataStream(output)};
+    }
+
+    /** This operator loads model data and predicts result. */
+    private static class PredictOutputFunction extends RichMapFunction<Row, Row> {
+
+        private final String[] inputCols;
+        private final String broadcastKey;
+        private final double missingValue;
+        private Map<String, Double> surrogates;
+
+        public PredictOutputFunction(double missingValue, String[] inputCols, String broadcastKey) {
+            this.missingValue = missingValue;
+            this.inputCols = inputCols;
+            this.broadcastKey = broadcastKey;
+        }
+
+        @Override
+        public Row map(Row row) throws Exception {
+            if (surrogates == null) {
+                ImputerModelData imputerModelData =
+                        (ImputerModelData)
+                                getRuntimeContext().getBroadcastVariable(broadcastKey).get(0);
+                surrogates = imputerModelData.surrogates;
+                Preconditions.checkArgument(
+                        surrogates.size() == inputCols.length,
+                        "Imputer is expecting %s input columns as input but only got %s.",
+                        surrogates.size(),
+                        inputCols.length);
+            }
+
+            Row outputRow = new Row(inputCols.length);
+            for (int i = 0; i < inputCols.length; i++) {
+                Object value = row.getField(i);

Review Comment:
   nit: `Double value = row.getFieldAs(i)` might be better.



##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/DoubleParam.java:
##########
@@ -32,4 +34,12 @@ public DoubleParam(
     public DoubleParam(String name, String description, Double defaultValue) {
         this(name, description, defaultValue, ParamValidators.alwaysTrue());
     }
+
+    @Override
+    public Double jsonDecode(Object json) throws IOException {

Review Comment:
   IOException can be removed.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/imputer/ImputerModel.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.imputer;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/** A Model which replace the missing values using the model data computed by {@link Imputer}. */
+public class ImputerModel implements Model<ImputerModel>, ImputerModelParams<ImputerModel> {
+
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+    private Table modelDataTable;
+
+    public ImputerModel() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public ImputerModel setModelData(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        modelDataTable = inputs[0];
+        return this;
+    }
+
+    @Override
+    public Table[] getModelData() {
+        return new Table[] {modelDataTable};
+    }
+
+    @Override
+    public void save(String path) throws IOException {
+        ReadWriteUtils.saveMetadata(this, path);
+        ReadWriteUtils.saveModelData(
+                ImputerModelData.getModelDataStream(modelDataTable),
+                path,
+                new ImputerModelData.ModelDataEncoder());
+    }
+
+    public static ImputerModel load(StreamTableEnvironment tEnv, String path) throws IOException {
+        ImputerModel model = ReadWriteUtils.loadStageParam(path);
+        Table modelDataTable =
+                ReadWriteUtils.loadModelData(tEnv, path, new ImputerModelData.ModelDataDecoder());
+        return model.setModelData(modelDataTable);
+    }
+
+    @Override
+    public Map<Param<?>, Object> getParamMap() {
+        return paramMap;
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        String[] inputCols = getInputCols();
+        String[] outputCols = getOutputCols();
+        Preconditions.checkArgument(inputCols.length == outputCols.length);
+
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        DataStream<Row> dataStream = tEnv.toDataStream(inputs[0]);
+        DataStream<ImputerModelData> imputerModel =
+                ImputerModelData.getModelDataStream(modelDataTable);
+
+        final String broadcastModelKey = "broadcastModelKey";
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        TypeInformation<?>[] outputTypes = new TypeInformation[outputCols.length];
+        Arrays.fill(outputTypes, BasicTypeInfo.DOUBLE_TYPE_INFO);
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), outputTypes),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), outputCols));
+
+        DataStream<Row> output =
+                BroadcastUtils.withBroadcastStream(
+                        Collections.singletonList(dataStream),
+                        Collections.singletonMap(broadcastModelKey, imputerModel),
+                        inputList -> {
+                            DataStream input = inputList.get(0);
+                            return input.map(
+                                    new PredictOutputFunction(
+                                            getMissingValue(), inputCols, broadcastModelKey),
+                                    outputTypeInfo);
+                        });
+        return new Table[] {tEnv.fromDataStream(output)};
+    }
+
+    /** This operator loads model data and predicts result. */
+    private static class PredictOutputFunction extends RichMapFunction<Row, Row> {
+
+        private final String[] inputCols;
+        private final String broadcastKey;
+        private final double missingValue;
+        private Map<String, Double> surrogates;
+
+        public PredictOutputFunction(double missingValue, String[] inputCols, String broadcastKey) {
+            this.missingValue = missingValue;
+            this.inputCols = inputCols;
+            this.broadcastKey = broadcastKey;
+        }
+
+        @Override
+        public Row map(Row row) throws Exception {
+            if (surrogates == null) {
+                ImputerModelData imputerModelData =
+                        (ImputerModelData)
+                                getRuntimeContext().getBroadcastVariable(broadcastKey).get(0);
+                surrogates = imputerModelData.surrogates;
+                Preconditions.checkArgument(
+                        surrogates.size() == inputCols.length,
+                        "Imputer is expecting %s input columns as input but only got %s.",

Review Comment:
   I think it should be acceptable so long as `inputCols` is a subset of `imputerModelData.surrogates`. They do not need to be equal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #166: [FLINK-29598] Add Estimator and Transformer for Imputer

Posted by GitBox <gi...@apache.org>.
jiangxin369 commented on code in PR #166:
URL: https://github.com/apache/flink-ml/pull/166#discussion_r1008977825


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/imputer/Imputer.java:
##########
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.imputer;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.common.util.QuantileSummary;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The Imputer estimator completes missing values in a dataset. Missing values can be imputed using

Review Comment:
   How about `The Imputer estimator completes missing values of the input columns. `, cause that the model can also be transformed on an unbounded data stream.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #166: [FLINK-29598] Add Estimator and Transformer for Imputer

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #166:
URL: https://github.com/apache/flink-ml/pull/166#discussion_r1013518990


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/DoubleParam.java:
##########
@@ -32,4 +34,12 @@ public DoubleParam(
     public DoubleParam(String name, String description, Double defaultValue) {
         this(name, description, defaultValue, ParamValidators.alwaysTrue());
     }
+
+    @Override
+    public Double jsonDecode(Object json) throws IOException {
+        if (json instanceof String && json.equals(String.valueOf(Double.NaN))) {
+            return Double.NaN;
+        }
+        return (Double) json;

Review Comment:
   Other special values include `Double.POSITIVE_INFINITY/NEGATIVE_INFINITY`. These special values might also be the invalid values that need to be replaced with Imputer.
   
   I agree with it that we should also update `FloatParam`. By the way, algorithms like Bucketizer have been using these special values in their parameters, and Flink ML's implementation of  `DoubleArrayParam` has supported such special values, so it might be enough just to update `DoubleParam` and `FloatParam`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #166: [FLINK-29598] Add Estimator and Transformer for Imputer

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #166:
URL: https://github.com/apache/flink-ml/pull/166#discussion_r1009085375


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/imputer/Imputer.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.imputer;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.common.util.QuantileSummary;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The imputer for completing missing values of the input columns.
+ *
+ * <p>Missing values can be imputed using the statistics(mean, median or most frequent) of each
+ * column in which the missing values are located. The input columns should be of numeric type.

Review Comment:
   Could you please add test cases to verify that this algorithm can work on numeric values other than doubles, like integers or floats?



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/imputer/ImputerModel.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.imputer;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/** A Model which replace the missing values using the model data computed by {@link Imputer}. */
+public class ImputerModel implements Model<ImputerModel>, ImputerModelParams<ImputerModel> {
+
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+    private Table modelDataTable;
+
+    public ImputerModel() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public ImputerModel setModelData(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        modelDataTable = inputs[0];
+        return this;
+    }
+
+    @Override
+    public Table[] getModelData() {
+        return new Table[] {modelDataTable};
+    }
+
+    @Override
+    public void save(String path) throws IOException {
+        ReadWriteUtils.saveMetadata(this, path);
+        ReadWriteUtils.saveModelData(
+                ImputerModelData.getModelDataStream(modelDataTable),
+                path,
+                new ImputerModelData.ModelDataEncoder());
+    }
+
+    public static ImputerModel load(StreamTableEnvironment tEnv, String path) throws IOException {
+        ImputerModel model = ReadWriteUtils.loadStageParam(path);
+        Table modelDataTable =
+                ReadWriteUtils.loadModelData(tEnv, path, new ImputerModelData.ModelDataDecoder());
+        return model.setModelData(modelDataTable);
+    }
+
+    @Override
+    public Map<Param<?>, Object> getParamMap() {
+        return paramMap;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        String[] inputCols = getInputCols();
+        String[] outputCols = getOutputCols();
+        Preconditions.checkArgument(inputCols.length == outputCols.length);
+
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        DataStream<Row> dataStream = tEnv.toDataStream(inputs[0]);
+        DataStream<ImputerModelData> imputerModel =
+                ImputerModelData.getModelDataStream(modelDataTable);
+
+        final String broadcastModelKey = "broadcastModelKey";
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        TypeInformation<?>[] outputTypes = new TypeInformation[outputCols.length];
+        Arrays.fill(outputTypes, BasicTypeInfo.DOUBLE_TYPE_INFO);
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), outputTypes),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), outputCols));
+
+        DataStream<Row> output =
+                BroadcastUtils.withBroadcastStream(
+                        Collections.singletonList(dataStream),
+                        Collections.singletonMap(broadcastModelKey, imputerModel),
+                        inputList -> {
+                            DataStream input = inputList.get(0);
+                            return input.map(
+                                    new PredictOutputFunction(
+                                            getMissingValue(), inputCols, broadcastModelKey),
+                                    outputTypeInfo);
+                        });
+        return new Table[] {tEnv.fromDataStream(output)};
+    }
+
+    /** This operator loads model data and predicts result. */
+    private static class PredictOutputFunction extends RichMapFunction<Row, Row> {
+
+        private final String[] inputCols;
+        private final String broadcastKey;
+        private final double missingValue;
+        private Map<String, Double> surrogates;
+
+        public PredictOutputFunction(double missingValue, String[] inputCols, String broadcastKey) {
+            this.missingValue = missingValue;
+            this.inputCols = inputCols;
+            this.broadcastKey = broadcastKey;
+        }
+
+        @Override
+        public Row map(Row row) throws Exception {
+            if (surrogates == null) {
+                ImputerModelData imputerModelData =
+                        (ImputerModelData)
+                                getRuntimeContext().getBroadcastVariable(broadcastKey).get(0);
+                surrogates = imputerModelData.surrogates;
+                Arrays.stream(inputCols)
+                        .forEach(
+                                col ->
+                                        Preconditions.checkArgument(
+                                                surrogates.containsKey(col),
+                                                "Column %s is unacceptable for the Imputer model.",
+                                                col));
+            }
+
+            Row outputRow = new Row(inputCols.length);
+            for (int i = 0; i < inputCols.length; i++) {
+                Double value = (Double) row.getField(i);
+                boolean shouldReplace;
+                if (Double.isNaN(missingValue)) {
+                    shouldReplace = value == null || Double.isNaN(value);
+                } else {
+                    shouldReplace = value == null || value == missingValue;
+                }

Review Comment:
   `boolean shouldReplace = value == null || value.equals(missingValue);` should be enough. Double.NaN is not a corner case.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasRelativeError.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.param;
+
+import org.apache.flink.ml.param.DoubleParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WithParams;
+
+/** Interface for shared param relativeError. */
+public interface HasRelativeError<T> extends WithParams<T> {
+    Param<Double> RELATIVE_ERROR =
+            new DoubleParam(
+                    "relativeError",
+                    "The relative target precision for the approximate quantile algorithm. Must be in the range (0, 1).",
+                    0.001,
+                    ParamValidators.inRange(0, 1, false, false));

Review Comment:
   In Spark, 0 and 1 are also valid values for this parameter. Should we also support them in flink ml?



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/imputer/Imputer.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.imputer;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.common.util.QuantileSummary;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The imputer for completing missing values of the input columns.
+ *
+ * <p>Missing values can be imputed using the statistics(mean, median or most frequent) of each
+ * column in which the missing values are located. The input columns should be of numeric type.
+ *
+ * <p>Note that the mean/median/most_frequent value is computed after filtering out missing values.
+ * All null values in the input columns are also treated as missing, and so are imputed.
+ */
+public class Imputer implements Estimator<Imputer, ImputerModel>, ImputerParams<Imputer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public Imputer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public ImputerModel fit(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        Preconditions.checkArgument(
+                getInputCols().length == getOutputCols().length,
+                "Num of input columns and output columns are inconsistent.");
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        DataStream<Row> inputData = tEnv.toDataStream(inputs[0]);
+
+        DataStream<ImputerModelData> modelData;
+        switch (getStrategy()) {
+            case MEAN:
+                modelData =
+                        DataStreamUtils.aggregate(
+                                inputData,
+                                new MeanStrategyAggregator(getInputCols(), getMissingValue()));
+                break;
+            case MEDIAN:
+                modelData =
+                        DataStreamUtils.aggregate(
+                                inputData,
+                                new MedianStrategyAggregator(
+                                        getInputCols(), getMissingValue(), getRelativeError()));
+                break;
+            case MOST_FREQUENT:
+                modelData =
+                        DataStreamUtils.aggregate(
+                                inputData,
+                                new MostFrequentStrategyAggregator(
+                                        getInputCols(), getMissingValue()));
+                break;
+            default:
+                throw new RuntimeException("Unsupported strategy of Imputer: " + getStrategy());
+        }
+        ImputerModel model = new ImputerModel().setModelData(tEnv.fromDataStream(modelData));
+        ReadWriteUtils.updateExistingParams(model, getParamMap());
+        return model;
+    }
+
+    /**
+     * A stream operator to compute the mean value of all input columns of the input bounded data
+     * stream.
+     */
+    private static class MeanStrategyAggregator
+            implements AggregateFunction<Row, Map<String, Tuple2<Double, Long>>, ImputerModelData> {
+
+        private final String[] columnNames;
+        private final double missingValue;
+
+        public MeanStrategyAggregator(String[] columnNames, double missingValue) {
+            this.columnNames = columnNames;
+            this.missingValue = missingValue;
+        }
+
+        @Override
+        public Map<String, Tuple2<Double, Long>> createAccumulator() {
+            Map<String, Tuple2<Double, Long>> accumulators = new HashMap<>();
+            Arrays.stream(columnNames).forEach(x -> accumulators.put(x, Tuple2.of(0.0, 0L)));
+            return accumulators;
+        }
+
+        @Override
+        public Map<String, Tuple2<Double, Long>> add(
+                Row row, Map<String, Tuple2<Double, Long>> accumulators) {
+            accumulators.forEach(
+                    (col, sumAndNum) -> {
+                        Double rawValue = (Double) row.getField(col);
+                        boolean shouldBypass =
+                                rawValue == null
+                                        || Double.isNaN(rawValue)
+                                        || rawValue == missingValue;
+                        if (!shouldBypass) {
+                            sumAndNum.f0 += rawValue;
+                            sumAndNum.f1 += 1;
+                        }
+                    });
+            return accumulators;
+        }
+
+        @Override
+        public ImputerModelData getResult(Map<String, Tuple2<Double, Long>> map) {
+            long numRows = map.entrySet().stream().findFirst().get().getValue().f1;
+            Preconditions.checkState(
+                    numRows > 0, "The training set is empty or does not contains valid data.");
+
+            Map<String, Double> surrogates = new HashMap<>();
+            map.forEach((col, sumAndNum) -> surrogates.put(col, sumAndNum.f0 / sumAndNum.f1));
+            return new ImputerModelData(surrogates);
+        }
+
+        @Override
+        public Map<String, Tuple2<Double, Long>> merge(
+                Map<String, Tuple2<Double, Long>> acc1, Map<String, Tuple2<Double, Long>> acc2) {
+            Preconditions.checkArgument(acc1.size() == acc2.size());
+
+            acc1.forEach(
+                    (col, numAndSum) -> {
+                        acc2.get(col).f0 += numAndSum.f0;
+                        acc2.get(col).f1 += numAndSum.f1;
+                    });
+            return acc2;
+        }
+    }
+
+    /**
+     * A stream operator to compute the median value of all input columns of the input bounded data
+     * stream.
+     */
+    private static class MedianStrategyAggregator
+            implements AggregateFunction<Row, Map<String, QuantileSummary>, ImputerModelData> {
+        private final String[] columnNames;
+        private final double missingValue;
+        private final double relativeError;
+
+        public MedianStrategyAggregator(
+                String[] columnNames, double missingValue, double relativeError) {
+            this.columnNames = columnNames;
+            this.missingValue = missingValue;
+            this.relativeError = relativeError;
+        }
+
+        @Override
+        public Map<String, QuantileSummary> createAccumulator() {
+            Map<String, QuantileSummary> summaries = new HashMap<>();
+            Arrays.stream(columnNames)
+                    .forEach(x -> summaries.put(x, new QuantileSummary(relativeError)));
+            return summaries;
+        }
+
+        @Override
+        public Map<String, QuantileSummary> add(Row row, Map<String, QuantileSummary> summaries) {
+            summaries.forEach(
+                    (col, summary) -> {
+                        Double rawValue = (Double) row.getField(col);
+                        boolean shouldBypass =
+                                rawValue == null
+                                        || Double.isNaN(rawValue)
+                                        || rawValue == missingValue;
+                        if (!shouldBypass) {
+                            summary.insert(rawValue);
+                        }
+                    });
+            return summaries;
+        }
+
+        @Override
+        public ImputerModelData getResult(Map<String, QuantileSummary> summaries) {
+            Preconditions.checkState(
+                    !summaries.entrySet().stream().findFirst().get().getValue().isEmpty(),
+                    "The training set is empty or does not contains valid data.");
+
+            Map<String, Double> surrogates = new HashMap<>();
+            summaries.forEach(
+                    (col, summary) -> {
+                        QuantileSummary compressed = summary.compress();
+                        double median = compressed.query(0.5);
+                        surrogates.put(col, median);
+                    });
+            return new ImputerModelData(surrogates);
+        }
+
+        @Override
+        public Map<String, QuantileSummary> merge(
+                Map<String, QuantileSummary> acc1, Map<String, QuantileSummary> acc2) {
+            Preconditions.checkArgument(acc1.size() == acc2.size());
+
+            acc1.forEach(
+                    (col, summary1) -> {
+                        QuantileSummary summary2 = acc2.get(col).compress();
+                        acc2.put(col, summary2.merge(summary1.compress()));
+                    });
+            return acc2;
+        }
+    }
+
+    /**
+     * A stream operator to compute the most frequent value of all input columns of the input
+     * bounded data stream.
+     */
+    private static class MostFrequentStrategyAggregator
+            implements AggregateFunction<Row, Map<String, Map<Double, Long>>, ImputerModelData> {
+        private final String[] columnNames;
+        private final double missingValue;
+
+        public MostFrequentStrategyAggregator(String[] columnNames, double missingValue) {
+            this.columnNames = columnNames;
+            this.missingValue = missingValue;
+        }
+
+        @Override
+        public Map<String, Map<Double, Long>> createAccumulator() {
+            Map<String, Map<Double, Long>> accumulators = new HashMap<>();
+            Arrays.stream(columnNames).forEach(x -> accumulators.put(x, new HashMap<>()));
+            return accumulators;
+        }
+
+        @Override
+        public Map<String, Map<Double, Long>> add(
+                Row row, Map<String, Map<Double, Long>> accumulators) {
+            accumulators.forEach(
+                    (col, counts) -> {
+                        Double rawValue = (Double) row.getField(col);
+                        boolean shouldBypass =
+                                rawValue == null
+                                        || Double.isNaN(rawValue)
+                                        || rawValue == missingValue;
+                        if (!shouldBypass) {
+                            double value = rawValue;
+                            if (counts.containsKey(value)) {
+                                counts.put(value, counts.get(value) + 1);
+                            } else {
+                                counts.put(value, 1L);
+                            }
+                        }
+                    });
+            return accumulators;
+        }
+
+        @Override
+        public ImputerModelData getResult(Map<String, Map<Double, Long>> map) {
+            long validColumns =
+                    map.entrySet().stream().filter(x -> x.getValue().size() > 0).count();
+            Preconditions.checkState(
+                    validColumns > 0, "The training set is empty or does not contains valid data.");
+
+            Map<String, Double> surrogates = new HashMap<>();
+            map.forEach(
+                    (col, counts) -> {
+                        long maxCnt = Long.MIN_VALUE;
+                        double value = Double.NaN;
+                        for (Map.Entry<Double, Long> entry : counts.entrySet()) {
+                            if (maxCnt < entry.getValue()) {
+                                maxCnt = entry.getValue();
+                                value = entry.getKey();

Review Comment:
   Let's guarantee that when there are multiple max counts, the smallest value would be selected.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasRelativeError.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.common.param;
+
+import org.apache.flink.ml.param.DoubleParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.WithParams;
+
+/** Interface for shared param relativeError. */
+public interface HasRelativeError<T> extends WithParams<T> {
+    Param<Double> RELATIVE_ERROR =
+            new DoubleParam(
+                    "relativeError",
+                    "The relative target precision for the approximate quantile algorithm. Must be in the range (0, 1).",

Review Comment:
   I understand that spark also has the description "Must be in the range...", but since it is not common practice to add ParamValidator's description in Param's description in Flink ML, I'm not sure if it would be better if we remove the last sentence.



##########
docs/content/docs/operators/feature/imputer.md:
##########
@@ -0,0 +1,188 @@
+---
+title: "Imputer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/imputer.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions dand limitations
+under the License.
+-->
+
+## Imputer
+The imputer for completing missing values of the input columns.
+Missing values can be imputed using the statistics(mean, median or most frequent) of each column in which the missing values are located. The input columns should be of numeric type.
+
+__Note__ The mean/median/most_frequent value is computed after filtering out missing values and null values, null values are always treated as missing, and so are also imputed.
+
+### Input Columns
+
+| Param name | Type   | Default | Description             |
+|:-----------|:-------|:--------|:------------------------|
+| inputCols  | Number | `null`  | Features to be imputed. |
+
+### Output Columns
+
+| Param name | Type   | Default | Description       |
+|:-----------|:-------|:--------|:------------------|
+| outputCols | Double | `null`  | Imputed features. |
+
+### Parameters
+
+Below are the parameters required by `ImputerModel`.
+
+| Key           | Default   | Type        | Required | Description                                                                    |
+|---------------|-----------|-------------|----------|--------------------------------------------------------------------------------|
+| inputCols     | `null`    | String[]    | yes      | Input column names.                                                            |
+| outputCols    | `null`    | String[]    | yes      | Output column names.                                                           |
+| strategy      | `"mean"`  | String      | no       | The imputation strategy. Supported values: 'mean', 'median', 'most_frequent'.  |
+| relativeError | `0.001`   | Double      | no       | The relative target precision, only effective when the strategy is 'median'.     |

Review Comment:
   Let's keep the description the same across JavaDoc and markdown documents. 
   
   If we want to make the point that `relativeError` only works with certain imputation strategy, we can place this point to the javadoc and the markdown description of the whole class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 merged pull request #166: [FLINK-29598] Add Estimator and Transformer for Imputer

Posted by GitBox <gi...@apache.org>.
lindong28 merged PR #166:
URL: https://github.com/apache/flink-ml/pull/166


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #166: [FLINK-29598] Add Estimator and Transformer for Imputer

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #166:
URL: https://github.com/apache/flink-ml/pull/166#discussion_r1012433599


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ImputerTest.java:
##########
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.imputer.Imputer;
+import org.apache.flink.ml.feature.imputer.ImputerModel;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.ml.feature.imputer.ImputerParams.MEAN;
+import static org.apache.flink.ml.feature.imputer.ImputerParams.MEDIAN;
+import static org.apache.flink.ml.feature.imputer.ImputerParams.MOST_FREQUENT;
+import static org.apache.flink.test.util.TestBaseUtils.compareResultCollections;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/** Tests {@link Imputer} and {@link ImputerModel}. */
+public class ImputerTest {

Review Comment:
   Let's make this class `extends AbstractTestBase`. This could help start one mini-cluster and reuse it across all test cases, thus improving test efficiency.



##########
docs/content/docs/operators/feature/imputer.md:
##########
@@ -0,0 +1,190 @@
+---
+title: "Imputer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/imputer.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions dand limitations
+under the License.
+-->
+
+## Imputer
+The imputer for completing missing values of the input columns.
+Missing values can be imputed using the statistics(mean, median or most frequent) of each column in which the missing values are located. The input columns should be of numeric type.

Review Comment:
   Let's add a blank line above this, or markdown might treat these two paragraphs as one.



##########
docs/content/docs/operators/feature/imputer.md:
##########
@@ -0,0 +1,190 @@
+---
+title: "Imputer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/imputer.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions dand limitations
+under the License.
+-->
+
+## Imputer
+The imputer for completing missing values of the input columns.
+Missing values can be imputed using the statistics(mean, median or most frequent) of each column in which the missing values are located. The input columns should be of numeric type.
+
+__Note__ The `mean`/`median`/`most frequent` value is computed after filtering out missing values and null values, null values are always treated as missing, and so are also imputed.

Review Comment:
   Let's reformat the file so that there is no more than 80 characters per line.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #166: [FLINK-29598] Add Estimator and Transformer for Imputer

Posted by GitBox <gi...@apache.org>.
jiangxin369 commented on code in PR #166:
URL: https://github.com/apache/flink-ml/pull/166#discussion_r1013581529


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/DoubleParam.java:
##########
@@ -32,4 +34,12 @@ public DoubleParam(
     public DoubleParam(String name, String description, Double defaultValue) {
         this(name, description, defaultValue, ParamValidators.alwaysTrue());
     }
+
+    @Override
+    public Double jsonDecode(Object json) throws IOException {
+        if (json instanceof String && json.equals(String.valueOf(Double.NaN))) {
+            return Double.NaN;
+        }
+        return (Double) json;

Review Comment:
   Sure, I'll add



##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/DoubleParam.java:
##########
@@ -32,4 +34,12 @@ public DoubleParam(
     public DoubleParam(String name, String description, Double defaultValue) {
         this(name, description, defaultValue, ParamValidators.alwaysTrue());
     }
+
+    @Override
+    public Double jsonDecode(Object json) throws IOException {
+        if (json instanceof String && json.equals(String.valueOf(Double.NaN))) {
+            return Double.NaN;
+        }
+        return (Double) json;

Review Comment:
   Sure, I'll add.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/imputer/Imputer.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.imputer;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.common.util.QuantileSummary;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The imputer for completing missing values of the input columns.
+ *
+ * <p>Missing values can be imputed using the statistics(mean, median or most frequent) of each
+ * column in which the missing values are located. The input columns should be of numeric type.

Review Comment:
   Yes, it is addressed by adding Interger and Float columns on traning data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #166: [FLINK-29598] Add Estimator and Transformer for Imputer

Posted by GitBox <gi...@apache.org>.
lindong28 commented on code in PR #166:
URL: https://github.com/apache/flink-ml/pull/166#discussion_r1012805730


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/imputer/ImputerModel.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.imputer;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/** A Model which replace the missing values using the model data computed by {@link Imputer}. */

Review Comment:
   replace ->replaces
   
   Same for similar docs in this PR.



##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/DoubleParam.java:
##########
@@ -32,4 +34,12 @@ public DoubleParam(
     public DoubleParam(String name, String description, Double defaultValue) {
         this(name, description, defaultValue, ParamValidators.alwaysTrue());
     }
+
+    @Override
+    public Double jsonDecode(Object json) throws IOException {
+        if (json instanceof String && json.equals(String.valueOf(Double.NaN))) {
+            return Double.NaN;
+        }
+        return (Double) json;

Review Comment:
   What other special value do we expect to handle?
   
   If it is just `NaN`, it seems simpler to just do the following
   ```
   if (json.equals("NaN")) {
       return Double.NaN;
   }
   ```
   
   Depending on what other special value we plan to handle, we might need to update e.g. `FloatParam` for consistency.



##########
docs/content/docs/operators/feature/imputer.md:
##########
@@ -0,0 +1,196 @@
+---
+title: "Imputer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/imputer.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions dand limitations
+under the License.
+-->
+
+## Imputer
+The imputer for completing missing values of the input columns.
+
+Missing values can be imputed using the statistics(mean, median or 
+most frequent) of each column in which the missing values are located.
+The input columns should be of numeric type.
+
+__Note__ The `mean`/`median`/`most frequent` value is computed after 
+filtering out missing values and null values, null values are always 
+treated as missing, and so are also imputed.
+
+__Note__ The parameter `relativeError` is only effective when the strategy
+ is `median`.
+
+### Input Columns
+
+| Param name | Type   | Default | Description             |
+|:-----------|:-------|:--------|:------------------------|
+| inputCols  | Number | `null`  | Features to be imputed. |
+
+### Output Columns
+
+| Param name | Type   | Default | Description       |
+|:-----------|:-------|:--------|:------------------|
+| outputCols | Double | `null`  | Imputed features. |
+
+### Parameters
+
+Below are the parameters required by `ImputerModel`.
+
+| Key           | Default   | Type        | Required | Description                                                                    |
+|---------------|-----------|-------------|----------|--------------------------------------------------------------------------------|
+| inputCols     | `null`    | String[]    | yes      | Input column names.                                                            |
+| outputCols    | `null`    | String[]    | yes      | Output column names.                                                           |
+| strategy      | `"mean"`  | String      | no       | The imputation strategy. Supported values: 'mean', 'median', 'most_frequent'.  |
+| relativeError | `0.001`   | Double      | no       | The relative target precision for the approximate quantile algorithm.          |
+
+`Imputer` needs parameters above and also below.
+
+| Key           | Default      | Type        | Required | Description                                                                                |
+|---------------|--------------|-------------|----------|--------------------------------------------------------------------------------------------|
+| missingValue  | `Double.NaN` | Double      | no       | The placeholder for the missing values. All occurrences of missing values will be imputed. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.feature.imputer.Imputer;
+import org.apache.flink.ml.feature.imputer.ImputerModel;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+
+/** Simple program that trains a {@link Imputer} model and uses it for feature engineering. */
+public class ImputerExample {
+
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input training and prediction data.
+        DataStream<Row> trainStream =
+                env.fromElements(
+                        Row.of(Double.NaN, 9.0),
+                        Row.of(1.0, 9.0),
+                        Row.of(1.5, 9.0),
+                        Row.of(2.5, Double.NaN),
+                        Row.of(5.0, 5.0),
+                        Row.of(5.0, 4.0));
+        Table trainTable = tEnv.fromDataStream(trainStream).as("input1", "input2");
+
+        // Create a Imputer object and initialize its parameters

Review Comment:
   a -> an
   
   Same for other comments in this PR.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/imputer/Imputer.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.imputer;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.common.util.QuantileSummary;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The imputer for completing missing values of the input columns.
+ *
+ * <p>Missing values can be imputed using the statistics(mean, median or most frequent) of each
+ * column in which the missing values are located. The input columns should be of numeric type.

Review Comment:
   Is this issue addressed?



##########
docs/content/docs/operators/feature/imputer.md:
##########
@@ -0,0 +1,196 @@
+---
+title: "Imputer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/imputer.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions dand limitations
+under the License.
+-->
+
+## Imputer
+The imputer for completing missing values of the input columns.
+
+Missing values can be imputed using the statistics(mean, median or 
+most frequent) of each column in which the missing values are located.
+The input columns should be of numeric type.
+
+__Note__ The `mean`/`median`/`most frequent` value is computed after 
+filtering out missing values and null values, null values are always 
+treated as missing, and so are also imputed.
+
+__Note__ The parameter `relativeError` is only effective when the strategy
+ is `median`.
+
+### Input Columns
+
+| Param name | Type   | Default | Description             |
+|:-----------|:-------|:--------|:------------------------|
+| inputCols  | Number | `null`  | Features to be imputed. |
+
+### Output Columns
+
+| Param name | Type   | Default | Description       |
+|:-----------|:-------|:--------|:------------------|
+| outputCols | Double | `null`  | Imputed features. |
+
+### Parameters
+
+Below are the parameters required by `ImputerModel`.
+
+| Key           | Default   | Type        | Required | Description                                                                    |
+|---------------|-----------|-------------|----------|--------------------------------------------------------------------------------|
+| inputCols     | `null`    | String[]    | yes      | Input column names.                                                            |
+| outputCols    | `null`    | String[]    | yes      | Output column names.                                                           |
+| strategy      | `"mean"`  | String      | no       | The imputation strategy. Supported values: 'mean', 'median', 'most_frequent'.  |
+| relativeError | `0.001`   | Double      | no       | The relative target precision for the approximate quantile algorithm.          |
+
+`Imputer` needs parameters above and also below.
+
+| Key           | Default      | Type        | Required | Description                                                                                |
+|---------------|--------------|-------------|----------|--------------------------------------------------------------------------------------------|
+| missingValue  | `Double.NaN` | Double      | no       | The placeholder for the missing values. All occurrences of missing values will be imputed. |

Review Comment:
   Is the parameter documented here consistent with the actual code?



##########
docs/content/docs/operators/feature/imputer.md:
##########
@@ -0,0 +1,196 @@
+---
+title: "Imputer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/imputer.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions dand limitations
+under the License.
+-->
+
+## Imputer
+The imputer for completing missing values of the input columns.
+
+Missing values can be imputed using the statistics(mean, median or 
+most frequent) of each column in which the missing values are located.
+The input columns should be of numeric type.
+
+__Note__ The `mean`/`median`/`most frequent` value is computed after 
+filtering out missing values and null values, null values are always 
+treated as missing, and so are also imputed.
+
+__Note__ The parameter `relativeError` is only effective when the strategy
+ is `median`.
+
+### Input Columns
+
+| Param name | Type   | Default | Description             |
+|:-----------|:-------|:--------|:------------------------|
+| inputCols  | Number | `null`  | Features to be imputed. |
+
+### Output Columns
+
+| Param name | Type   | Default | Description       |
+|:-----------|:-------|:--------|:------------------|
+| outputCols | Double | `null`  | Imputed features. |
+
+### Parameters
+
+Below are the parameters required by `ImputerModel`.
+
+| Key           | Default   | Type        | Required | Description                                                                    |
+|---------------|-----------|-------------|----------|--------------------------------------------------------------------------------|
+| inputCols     | `null`    | String[]    | yes      | Input column names.                                                            |
+| outputCols    | `null`    | String[]    | yes      | Output column names.                                                           |
+| strategy      | `"mean"`  | String      | no       | The imputation strategy. Supported values: 'mean', 'median', 'most_frequent'.  |

Review Comment:
   Should we describe the behavior for each option, similar to what is described in the Java doc?



##########
docs/content/docs/operators/feature/imputer.md:
##########
@@ -0,0 +1,196 @@
+---
+title: "Imputer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/imputer.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions dand limitations
+under the License.
+-->
+
+## Imputer
+The imputer for completing missing values of the input columns.
+
+Missing values can be imputed using the statistics(mean, median or 
+most frequent) of each column in which the missing values are located.
+The input columns should be of numeric type.
+
+__Note__ The `mean`/`median`/`most frequent` value is computed after 
+filtering out missing values and null values, null values are always 
+treated as missing, and so are also imputed.
+
+__Note__ The parameter `relativeError` is only effective when the strategy
+ is `median`.
+
+### Input Columns
+
+| Param name | Type   | Default | Description             |
+|:-----------|:-------|:--------|:------------------------|
+| inputCols  | Number | `null`  | Features to be imputed. |
+
+### Output Columns
+
+| Param name | Type   | Default | Description       |
+|:-----------|:-------|:--------|:------------------|
+| outputCols | Double | `null`  | Imputed features. |
+
+### Parameters
+
+Below are the parameters required by `ImputerModel`.
+
+| Key           | Default   | Type        | Required | Description                                                                    |
+|---------------|-----------|-------------|----------|--------------------------------------------------------------------------------|
+| inputCols     | `null`    | String[]    | yes      | Input column names.                                                            |
+| outputCols    | `null`    | String[]    | yes      | Output column names.                                                           |
+| strategy      | `"mean"`  | String      | no       | The imputation strategy. Supported values: 'mean', 'median', 'most_frequent'.  |
+| relativeError | `0.001`   | Double      | no       | The relative target precision for the approximate quantile algorithm.          |
+
+`Imputer` needs parameters above and also below.
+
+| Key           | Default      | Type        | Required | Description                                                                                |
+|---------------|--------------|-------------|----------|--------------------------------------------------------------------------------------------|
+| missingValue  | `Double.NaN` | Double      | no       | The placeholder for the missing values. All occurrences of missing values will be imputed. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.feature.imputer.Imputer;
+import org.apache.flink.ml.feature.imputer.ImputerModel;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+
+/** Simple program that trains a {@link Imputer} model and uses it for feature engineering. */
+public class ImputerExample {
+
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input training and prediction data.
+        DataStream<Row> trainStream =
+                env.fromElements(
+                        Row.of(Double.NaN, 9.0),
+                        Row.of(1.0, 9.0),
+                        Row.of(1.5, 9.0),
+                        Row.of(2.5, Double.NaN),
+                        Row.of(5.0, 5.0),
+                        Row.of(5.0, 4.0));
+        Table trainTable = tEnv.fromDataStream(trainStream).as("input1", "input2");
+
+        // Create a Imputer object and initialize its parameters
+        Imputer imputer =
+                new Imputer()
+                        .setInputCols("input1", "input2")
+                        .setOutputCols("output1", "output2")
+                        .setStrategy("mean")
+                        .setMissingValue(Double.NaN);
+
+        // Train the Imputer model.
+        ImputerModel model = imputer.fit(trainTable);
+
+        // Uses the Imputer model for predictions.

Review Comment:
   It seems inconsistent to start the coment with `Uses` here but use `Train` above. Can you make the comment style consistent with the exiting doc?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #166: [FLINK-29598] Add Estimator and Transformer for Imputer

Posted by GitBox <gi...@apache.org>.
lindong28 commented on code in PR #166:
URL: https://github.com/apache/flink-ml/pull/166#discussion_r1013532619


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/DoubleParam.java:
##########
@@ -32,4 +34,12 @@ public DoubleParam(
     public DoubleParam(String name, String description, Double defaultValue) {
         this(name, description, defaultValue, ParamValidators.alwaysTrue());
     }
+
+    @Override
+    public Double jsonDecode(Object json) throws IOException {
+        if (json instanceof String && json.equals(String.valueOf(Double.NaN))) {
+            return Double.NaN;
+        }
+        return (Double) json;

Review Comment:
   Can you add unit tests in StageTest to cover the save/load of these values for each of the parameter types affected by PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on pull request #166: [FLINK-29598] Add Estimator and Transformer for Imputer

Posted by GitBox <gi...@apache.org>.
lindong28 commented on PR #166:
URL: https://github.com/apache/flink-ml/pull/166#issuecomment-1303222285

   Thanks for the update. LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #166: [FLINK-29598] Add Estimator and Transformer for Imputer

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #166:
URL: https://github.com/apache/flink-ml/pull/166#discussion_r1013518990


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/DoubleParam.java:
##########
@@ -32,4 +34,12 @@ public DoubleParam(
     public DoubleParam(String name, String description, Double defaultValue) {
         this(name, description, defaultValue, ParamValidators.alwaysTrue());
     }
+
+    @Override
+    public Double jsonDecode(Object json) throws IOException {
+        if (json instanceof String && json.equals(String.valueOf(Double.NaN))) {
+            return Double.NaN;
+        }
+        return (Double) json;

Review Comment:
   Other special values include `Double.POSITIVE_INFINITY/NEGATIVE_INFINITY`. These special values might be used in algorithms like Bucketizer.
   
   I agree with it that we should also update `FloatParam`, as well as `DoubleArrayParam`, `FloatArrayParam` and `DoubleArrayArrayParam`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #166: [FLINK-29598] Add Estimator and Transformer for Imputer

Posted by GitBox <gi...@apache.org>.
lindong28 commented on code in PR #166:
URL: https://github.com/apache/flink-ml/pull/166#discussion_r1013532619


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/DoubleParam.java:
##########
@@ -32,4 +34,12 @@ public DoubleParam(
     public DoubleParam(String name, String description, Double defaultValue) {
         this(name, description, defaultValue, ParamValidators.alwaysTrue());
     }
+
+    @Override
+    public Double jsonDecode(Object json) throws IOException {
+        if (json instanceof String && json.equals(String.valueOf(Double.NaN))) {
+            return Double.NaN;
+        }
+        return (Double) json;

Review Comment:
   Can you add unit tests in StageTest to cover the save/load of these values for each of the parameter types affected by this PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org