You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/03 13:14:18 UTC

[GitHub] [flink-ml] lindong28 commented on a diff in pull request #166: [FLINK-29598] Add Estimator and Transformer for Imputer

lindong28 commented on code in PR #166:
URL: https://github.com/apache/flink-ml/pull/166#discussion_r1012805730


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/imputer/ImputerModel.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.imputer;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/** A Model which replace the missing values using the model data computed by {@link Imputer}. */

Review Comment:
   replace ->replaces
   
   Same for similar docs in this PR.



##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/DoubleParam.java:
##########
@@ -32,4 +34,12 @@ public DoubleParam(
     public DoubleParam(String name, String description, Double defaultValue) {
         this(name, description, defaultValue, ParamValidators.alwaysTrue());
     }
+
+    @Override
+    public Double jsonDecode(Object json) throws IOException {
+        if (json instanceof String && json.equals(String.valueOf(Double.NaN))) {
+            return Double.NaN;
+        }
+        return (Double) json;

Review Comment:
   What other special value do we expect to handle?
   
   If it is just `NaN`, it seems simpler to just do the following
   ```
   if (json.equals("NaN")) {
       return Double.NaN;
   }
   ```
   
   Depending on what other special value we plan to handle, we might need to update e.g. `FloatParam` for consistency.



##########
docs/content/docs/operators/feature/imputer.md:
##########
@@ -0,0 +1,196 @@
+---
+title: "Imputer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/imputer.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions dand limitations
+under the License.
+-->
+
+## Imputer
+The imputer for completing missing values of the input columns.
+
+Missing values can be imputed using the statistics(mean, median or 
+most frequent) of each column in which the missing values are located.
+The input columns should be of numeric type.
+
+__Note__ The `mean`/`median`/`most frequent` value is computed after 
+filtering out missing values and null values, null values are always 
+treated as missing, and so are also imputed.
+
+__Note__ The parameter `relativeError` is only effective when the strategy
+ is `median`.
+
+### Input Columns
+
+| Param name | Type   | Default | Description             |
+|:-----------|:-------|:--------|:------------------------|
+| inputCols  | Number | `null`  | Features to be imputed. |
+
+### Output Columns
+
+| Param name | Type   | Default | Description       |
+|:-----------|:-------|:--------|:------------------|
+| outputCols | Double | `null`  | Imputed features. |
+
+### Parameters
+
+Below are the parameters required by `ImputerModel`.
+
+| Key           | Default   | Type        | Required | Description                                                                    |
+|---------------|-----------|-------------|----------|--------------------------------------------------------------------------------|
+| inputCols     | `null`    | String[]    | yes      | Input column names.                                                            |
+| outputCols    | `null`    | String[]    | yes      | Output column names.                                                           |
+| strategy      | `"mean"`  | String      | no       | The imputation strategy. Supported values: 'mean', 'median', 'most_frequent'.  |
+| relativeError | `0.001`   | Double      | no       | The relative target precision for the approximate quantile algorithm.          |
+
+`Imputer` needs parameters above and also below.
+
+| Key           | Default      | Type        | Required | Description                                                                                |
+|---------------|--------------|-------------|----------|--------------------------------------------------------------------------------------------|
+| missingValue  | `Double.NaN` | Double      | no       | The placeholder for the missing values. All occurrences of missing values will be imputed. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.feature.imputer.Imputer;
+import org.apache.flink.ml.feature.imputer.ImputerModel;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+
+/** Simple program that trains a {@link Imputer} model and uses it for feature engineering. */
+public class ImputerExample {
+
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input training and prediction data.
+        DataStream<Row> trainStream =
+                env.fromElements(
+                        Row.of(Double.NaN, 9.0),
+                        Row.of(1.0, 9.0),
+                        Row.of(1.5, 9.0),
+                        Row.of(2.5, Double.NaN),
+                        Row.of(5.0, 5.0),
+                        Row.of(5.0, 4.0));
+        Table trainTable = tEnv.fromDataStream(trainStream).as("input1", "input2");
+
+        // Create a Imputer object and initialize its parameters

Review Comment:
   a -> an
   
   Same for other comments in this PR.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/imputer/Imputer.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.imputer;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.common.util.QuantileSummary;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The imputer for completing missing values of the input columns.
+ *
+ * <p>Missing values can be imputed using the statistics(mean, median or most frequent) of each
+ * column in which the missing values are located. The input columns should be of numeric type.

Review Comment:
   Is this issue addressed?



##########
docs/content/docs/operators/feature/imputer.md:
##########
@@ -0,0 +1,196 @@
+---
+title: "Imputer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/imputer.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions dand limitations
+under the License.
+-->
+
+## Imputer
+The imputer for completing missing values of the input columns.
+
+Missing values can be imputed using the statistics(mean, median or 
+most frequent) of each column in which the missing values are located.
+The input columns should be of numeric type.
+
+__Note__ The `mean`/`median`/`most frequent` value is computed after 
+filtering out missing values and null values, null values are always 
+treated as missing, and so are also imputed.
+
+__Note__ The parameter `relativeError` is only effective when the strategy
+ is `median`.
+
+### Input Columns
+
+| Param name | Type   | Default | Description             |
+|:-----------|:-------|:--------|:------------------------|
+| inputCols  | Number | `null`  | Features to be imputed. |
+
+### Output Columns
+
+| Param name | Type   | Default | Description       |
+|:-----------|:-------|:--------|:------------------|
+| outputCols | Double | `null`  | Imputed features. |
+
+### Parameters
+
+Below are the parameters required by `ImputerModel`.
+
+| Key           | Default   | Type        | Required | Description                                                                    |
+|---------------|-----------|-------------|----------|--------------------------------------------------------------------------------|
+| inputCols     | `null`    | String[]    | yes      | Input column names.                                                            |
+| outputCols    | `null`    | String[]    | yes      | Output column names.                                                           |
+| strategy      | `"mean"`  | String      | no       | The imputation strategy. Supported values: 'mean', 'median', 'most_frequent'.  |
+| relativeError | `0.001`   | Double      | no       | The relative target precision for the approximate quantile algorithm.          |
+
+`Imputer` needs parameters above and also below.
+
+| Key           | Default      | Type        | Required | Description                                                                                |
+|---------------|--------------|-------------|----------|--------------------------------------------------------------------------------------------|
+| missingValue  | `Double.NaN` | Double      | no       | The placeholder for the missing values. All occurrences of missing values will be imputed. |

Review Comment:
   Is the parameter documented here consistent with the actual code?



##########
docs/content/docs/operators/feature/imputer.md:
##########
@@ -0,0 +1,196 @@
+---
+title: "Imputer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/imputer.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions dand limitations
+under the License.
+-->
+
+## Imputer
+The imputer for completing missing values of the input columns.
+
+Missing values can be imputed using the statistics(mean, median or 
+most frequent) of each column in which the missing values are located.
+The input columns should be of numeric type.
+
+__Note__ The `mean`/`median`/`most frequent` value is computed after 
+filtering out missing values and null values, null values are always 
+treated as missing, and so are also imputed.
+
+__Note__ The parameter `relativeError` is only effective when the strategy
+ is `median`.
+
+### Input Columns
+
+| Param name | Type   | Default | Description             |
+|:-----------|:-------|:--------|:------------------------|
+| inputCols  | Number | `null`  | Features to be imputed. |
+
+### Output Columns
+
+| Param name | Type   | Default | Description       |
+|:-----------|:-------|:--------|:------------------|
+| outputCols | Double | `null`  | Imputed features. |
+
+### Parameters
+
+Below are the parameters required by `ImputerModel`.
+
+| Key           | Default   | Type        | Required | Description                                                                    |
+|---------------|-----------|-------------|----------|--------------------------------------------------------------------------------|
+| inputCols     | `null`    | String[]    | yes      | Input column names.                                                            |
+| outputCols    | `null`    | String[]    | yes      | Output column names.                                                           |
+| strategy      | `"mean"`  | String      | no       | The imputation strategy. Supported values: 'mean', 'median', 'most_frequent'.  |

Review Comment:
   Should we describe the behavior for each option, similar to what is described in the Java doc?



##########
docs/content/docs/operators/feature/imputer.md:
##########
@@ -0,0 +1,196 @@
+---
+title: "Imputer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/imputer.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions dand limitations
+under the License.
+-->
+
+## Imputer
+The imputer for completing missing values of the input columns.
+
+Missing values can be imputed using the statistics(mean, median or 
+most frequent) of each column in which the missing values are located.
+The input columns should be of numeric type.
+
+__Note__ The `mean`/`median`/`most frequent` value is computed after 
+filtering out missing values and null values, null values are always 
+treated as missing, and so are also imputed.
+
+__Note__ The parameter `relativeError` is only effective when the strategy
+ is `median`.
+
+### Input Columns
+
+| Param name | Type   | Default | Description             |
+|:-----------|:-------|:--------|:------------------------|
+| inputCols  | Number | `null`  | Features to be imputed. |
+
+### Output Columns
+
+| Param name | Type   | Default | Description       |
+|:-----------|:-------|:--------|:------------------|
+| outputCols | Double | `null`  | Imputed features. |
+
+### Parameters
+
+Below are the parameters required by `ImputerModel`.
+
+| Key           | Default   | Type        | Required | Description                                                                    |
+|---------------|-----------|-------------|----------|--------------------------------------------------------------------------------|
+| inputCols     | `null`    | String[]    | yes      | Input column names.                                                            |
+| outputCols    | `null`    | String[]    | yes      | Output column names.                                                           |
+| strategy      | `"mean"`  | String      | no       | The imputation strategy. Supported values: 'mean', 'median', 'most_frequent'.  |
+| relativeError | `0.001`   | Double      | no       | The relative target precision for the approximate quantile algorithm.          |
+
+`Imputer` needs parameters above and also below.
+
+| Key           | Default      | Type        | Required | Description                                                                                |
+|---------------|--------------|-------------|----------|--------------------------------------------------------------------------------------------|
+| missingValue  | `Double.NaN` | Double      | no       | The placeholder for the missing values. All occurrences of missing values will be imputed. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.feature.imputer.Imputer;
+import org.apache.flink.ml.feature.imputer.ImputerModel;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+
+/** Simple program that trains a {@link Imputer} model and uses it for feature engineering. */
+public class ImputerExample {
+
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input training and prediction data.
+        DataStream<Row> trainStream =
+                env.fromElements(
+                        Row.of(Double.NaN, 9.0),
+                        Row.of(1.0, 9.0),
+                        Row.of(1.5, 9.0),
+                        Row.of(2.5, Double.NaN),
+                        Row.of(5.0, 5.0),
+                        Row.of(5.0, 4.0));
+        Table trainTable = tEnv.fromDataStream(trainStream).as("input1", "input2");
+
+        // Create a Imputer object and initialize its parameters
+        Imputer imputer =
+                new Imputer()
+                        .setInputCols("input1", "input2")
+                        .setOutputCols("output1", "output2")
+                        .setStrategy("mean")
+                        .setMissingValue(Double.NaN);
+
+        // Train the Imputer model.
+        ImputerModel model = imputer.fit(trainTable);
+
+        // Uses the Imputer model for predictions.

Review Comment:
   It seems inconsistent to start the coment with `Uses` here but use `Train` above. Can you make the comment style consistent with the exiting doc?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org