You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/07 09:56:39 UTC

[GitHub] [flink-ml] zhipeng93 opened a new pull request, #82: [FLINK-27072] Add Transformer of Bucketizer

zhipeng93 opened a new pull request, #82:
URL: https://github.com/apache/flink-ml/pull/82

   ## What is the purpose of the change
   - Add Transformer of Bucketizer in FlinkML. 
   
   ## Brief change log
   - Reorganized the HasHandleInvalid param in StringIndexer, OnehotEncoder and VectorAssembler.
   - Add Bucketizer.
   - Add unit test for Bucketizer
   
   ## Does this pull request potentially affect one of the following parts:
   - Dependencies (does it add or upgrade a dependency): (no)
   - The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
   - Does this pull request introduce a new feature? (no)
   - If yes, how is the feature documented? (Java doc)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r846890813


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/onehotencoder/OneHotEncoderParams.java:
##########
@@ -27,6 +27,8 @@
 /**
  * Params of OneHotEncoderModel.
  *
+ * <p>The `keep` option of {@link HasHandleInvalid} is not supported in {@link OneHotEncoderParams}.

Review Comment:
   Sounds good. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r846882710


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/onehotencoder/OneHotEncoderParams.java:
##########
@@ -27,6 +27,8 @@
 /**
  * Params of OneHotEncoderModel.
  *
+ * <p>The `keep` option of {@link HasHandleInvalid} is not supported in {@link OneHotEncoderParams}.

Review Comment:
   I think it could be better if we make the JavaDoc correspond to the actual behavior. We can modify the JavaDoc when we add support for `SKIP` in future. Thus it might be better to comment that `OneHotEncoder` only supports `ERROR` in its JavaDoc for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
lindong28 commented on PR #82:
URL: https://github.com/apache/flink-ml/pull/82#issuecomment-1099101396

   Thanks for the update! LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
lindong28 commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r849438742


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/ArrayArrayParam.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.param;
+
+import org.apache.flink.ml.util.ReadWriteUtils;
+
+import java.io.IOException;
+
+/** Class for the arrayOfArray-type parameters. */
+public class ArrayArrayParam<T> extends Param<T[][]> {

Review Comment:
   I am not sure the alternative approach is simpler than the existing approach.
   
   The alternative approach does remove the need for DoubleArrayArrayParam. But for every parameter of type `double[][]`, it requires the setXXX() and getXXX() to have non-trivial implementation as describe above. We can add two public static methods to be shared by those setXXX() and getXXX(). But the addition of these two methods do not seem to be simpler than the existing `DoubleArrayArrayParam`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r850118629


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/onehotencoder/OneHotEncoderParams.java:
##########
@@ -27,6 +27,9 @@
 /**
  * Params of OneHotEncoderModel.
  *
+ * <p>The `keep` and `skip` option of {@link HasHandleInvalid} is not supported in {@link

Review Comment:
   Thanks for the comment. I prefer putting the description in XXXParam class for the following two reaons:
   
   - Putting the description to concrete classes may lead to duplicate comments. For example, `OneHotEncoder` and `OneHotEncoderModel`.
   - I don't really think checking Java doc for XXXParams is a bad/not-user-friendly behavior. They are public interfaces by the way.
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r845668376


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/ArrayArrayParam.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.param;
+
+import org.apache.flink.ml.util.ReadWriteUtils;
+
+import java.io.IOException;
+
+/** Class for the arrayOfArray-type parameters. */
+public class ArrayArrayParam<T> extends Param<T[][]> {

Review Comment:
   Shall we just use a 1D array to represent arrays of 2 or more dimensions, or figure out another way to represent arrays of arbitrary number of dimensions? I'm not sure we would want to create a `Param` subclass for 3D, 4D parameters.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/onehotencoder/OneHotEncoderParams.java:
##########
@@ -27,6 +27,8 @@
 /**
  * Params of OneHotEncoderModel.
  *
+ * <p>The `keep` option of {@link HasHandleInvalid} is not supported in {@link OneHotEncoderParams}.

Review Comment:
   It seems that `OneHotEncoder` only supports the `ERROR` option.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/BucketizerParams.java:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.bucketizer;
+
+import org.apache.flink.ml.common.param.HasHandleInvalid;
+import org.apache.flink.ml.common.param.HasInputCols;
+import org.apache.flink.ml.common.param.HasOutputCols;
+import org.apache.flink.ml.param.DoubleArrayArrayParam;
+import org.apache.flink.ml.param.ParamValidators;
+
+/**
+ * Params for {@link Bucketizer}.
+ *
+ * <p>The `keep` option of {@link HasHandleInvalid} means that we put the invalid data in the last
+ * bucket of the splits, whose index is the number of the buckets.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface BucketizerParams<T>
+        extends HasInputCols<T>, HasOutputCols<T>, HasHandleInvalid<T> {
+    /**
+     * The array of split points for mapping continuous features into buckets for multiple columns.
+     *
+     * <p>Each input column is supposed to be mapped into {numberOfSplitPoints - 1} buckets. A
+     * bucket is defined by two split points. For example, bucket(x,y) contains values in the range
+     * [x,y). An exception is that the last bucket also contains y. The array should contain at
+     * least three split points and be strictly increasing.
+     */
+    DoubleArrayArrayParam SPLITS_ARRAY =
+            new DoubleArrayArrayParam(
+                    "splitsArray", "Array of split points.", null, ParamValidators.nonEmptyArray());

Review Comment:
   Shall we have a more detailed description for this parameter, like the first line in Javadoc?



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/onehotencoder/OneHotEncoder.java:
##########
@@ -59,7 +58,7 @@ public OneHotEncoder() {
     @Override
     public OneHotEncoderModel fit(Table... inputs) {
         Preconditions.checkArgument(inputs.length == 1);
-        Preconditions.checkArgument(getHandleInvalid().equals(HasHandleInvalid.ERROR_INVALID));
+        Preconditions.checkArgument(getHandleInvalid().equals(OneHotEncoderParams.ERROR_INVALID));

Review Comment:
   Shall we just use `ERROR_INVALID`? `OneHotEncoder` is a subclass of `OneHotEncoderParams`. Same for other classes like VectorAssembler.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/OneHotEncoderTest.java:
##########
@@ -205,7 +205,7 @@ public void testInputDataType() {
 
     @Test
     public void testNotSupportedHandleInvalidOptions() {
-        estimator.setHandleInvalid(HasHandleInvalid.SKIP_INVALID);
+        estimator.setHandleInvalid(OneHotEncoderParams.SKIP_INVALID);

Review Comment:
   I'm not sure `OneHotEncoderParams.SKIP_INVALID` would be a better choice than `HasHandleInvalid.SKIP_INVALID`. Could you please illustrate why would you like to make this change?
   
   I think `HasHandleInvalid` might be easier for users. If they want to `setHandleInvalid`, they know that the available options are at `HasHandleInvalid`. When they use `setBatchStrategy`, the options are at `HasBatchStrategy`. This is more straightforward. `xxxParams`, on the other hand, would provide all public fields it has defined or inherited, and users have to depend on more information to judge which ones are valid options.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.bucketizer;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Bucketizer is a transformer that maps multiple columns of continuous features to multiple columns
+ * of discrete features, i.e., buckets IDs.
+ */
+public class Bucketizer implements Transformer<Bucketizer>, BucketizerParams<Bucketizer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public Bucketizer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        String[] inputCols = getInputCols();
+        String[] outputCols = getOutputCols();
+        Double[][] splitsArray = getSplitsArray();
+
+        Preconditions.checkArgument(inputCols.length == outputCols.length);
+        Preconditions.checkArgument(inputCols.length == splitsArray.length);
+        for (Double[] splits : splitsArray) {
+            Preconditions.checkArgument(
+                    splits.length >= 3,
+                    "Illegal value for "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + ". See param "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + " for details.");
+            for (int j = 1; j < splits.length; j++) {
+                Preconditions.checkArgument(
+                        splits[j] > splits[j - 1],
+                        "Illegal value for "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + ". See param "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + " for details.");
+            }
+        }

Review Comment:
   Shall we move this part of logic to a `ParamValidator` class? This class can be placed inside `BucketizerParams`.
   
   Moreover, We would always need to do some checks in ParamValidator, and to do the other checks in `fit()`/`transform()`. I think we should place a validation in ParamValidator if it is generally applicable to all subclasses that inherit this parameter. If the validation needs to be computed across parameters, or depends on the input Tables, then it should be placed in `fit()`/`transform()`.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.bucketizer;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Bucketizer is a transformer that maps multiple columns of continuous features to multiple columns
+ * of discrete features, i.e., buckets IDs.
+ */
+public class Bucketizer implements Transformer<Bucketizer>, BucketizerParams<Bucketizer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public Bucketizer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        String[] inputCols = getInputCols();
+        String[] outputCols = getOutputCols();
+        Double[][] splitsArray = getSplitsArray();
+
+        Preconditions.checkArgument(inputCols.length == outputCols.length);
+        Preconditions.checkArgument(inputCols.length == splitsArray.length);
+        for (Double[] splits : splitsArray) {
+            Preconditions.checkArgument(
+                    splits.length >= 3,
+                    "Illegal value for "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + ". See param "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + " for details.");
+            for (int j = 1; j < splits.length; j++) {
+                Preconditions.checkArgument(
+                        splits[j] > splits[j - 1],
+                        "Illegal value for "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + ". See param "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + " for details.");
+            }
+        }
+
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        TypeInformation<?>[] outputTypes = new TypeInformation[outputCols.length];
+        Arrays.fill(outputTypes, BasicTypeInfo.INT_TYPE_INFO);
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), outputTypes),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCols()));
+
+        DataStream<Row> result =
+                tEnv.toDataStream(inputs[0])
+                        .flatMap(
+                                new FindBucketFunction(inputCols, splitsArray, getHandleInvalid()),
+                                outputTypeInfo);
+        return new Table[] {tEnv.fromDataStream(result)};
+    }
+
+    /** Finds the bucket index for each continuous feature of an input data point. */
+    private static class FindBucketFunction implements FlatMapFunction<Row, Row> {
+        private final String[] inputCols;
+        private final String handleInvalid;
+        private final Double[][] splitsArray;
+
+        public FindBucketFunction(
+                String[] inputCols, Double[][] splitsArray, String handleInvalid) {
+            this.inputCols = inputCols;
+            this.splitsArray = splitsArray;
+            this.handleInvalid = handleInvalid;
+        }
+
+        @Override
+        public void flatMap(Row value, Collector<Row> out) {
+            Row outputRow = new Row(inputCols.length);
+
+            for (int i = 0; i < inputCols.length; i++) {
+                double feature = ((Number) value.getField(inputCols[i])).doubleValue();
+                Double[] splits = splitsArray[i];
+                boolean isInvalid = false;
+
+                if (!Double.isNaN(feature)) {
+                    int index = Arrays.binarySearch(splits, feature);
+                    if (index >= 0) {
+                        if (index == inputCols.length - 1) {
+                            index--;
+                        }
+                        outputRow.setField(i, index);
+                    } else {
+                        index = -index - 1;
+                        if (index == 0 || index == inputCols.length) {
+                            isInvalid = true;
+                        } else {
+                            outputRow.setField(i, index - 1);
+                        }
+                    }
+                } else {
+                    isInvalid = true;
+                }
+
+                if (isInvalid) {
+                    switch (handleInvalid) {
+                        case BucketizerParams.ERROR_INVALID:
+                            throw new RuntimeException(
+                                    "The input contains invalid value. See "
+                                            + BucketizerParams.HANDLE_INVALID
+                                            + " parameter for more options.");
+                        case BucketizerParams.SKIP_INVALID:
+                            return;
+                        case BucketizerParams.KEEP_INVALID:
+                            outputRow.setField(i, splits.length - 1);
+                            break;

Review Comment:
   This should be `continue`.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.bucketizer;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Bucketizer is a transformer that maps multiple columns of continuous features to multiple columns
+ * of discrete features, i.e., buckets IDs.
+ */
+public class Bucketizer implements Transformer<Bucketizer>, BucketizerParams<Bucketizer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public Bucketizer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        String[] inputCols = getInputCols();
+        String[] outputCols = getOutputCols();
+        Double[][] splitsArray = getSplitsArray();
+
+        Preconditions.checkArgument(inputCols.length == outputCols.length);
+        Preconditions.checkArgument(inputCols.length == splitsArray.length);
+        for (Double[] splits : splitsArray) {
+            Preconditions.checkArgument(
+                    splits.length >= 3,
+                    "Illegal value for "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + ". See param "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + " for details.");
+            for (int j = 1; j < splits.length; j++) {
+                Preconditions.checkArgument(
+                        splits[j] > splits[j - 1],
+                        "Illegal value for "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + ". See param "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + " for details.");
+            }
+        }
+
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        TypeInformation<?>[] outputTypes = new TypeInformation[outputCols.length];
+        Arrays.fill(outputTypes, BasicTypeInfo.INT_TYPE_INFO);
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), outputTypes),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCols()));
+
+        DataStream<Row> result =
+                tEnv.toDataStream(inputs[0])
+                        .flatMap(
+                                new FindBucketFunction(inputCols, splitsArray, getHandleInvalid()),
+                                outputTypeInfo);
+        return new Table[] {tEnv.fromDataStream(result)};
+    }
+
+    /** Finds the bucket index for each continuous feature of an input data point. */
+    private static class FindBucketFunction implements FlatMapFunction<Row, Row> {
+        private final String[] inputCols;
+        private final String handleInvalid;
+        private final Double[][] splitsArray;
+
+        public FindBucketFunction(
+                String[] inputCols, Double[][] splitsArray, String handleInvalid) {
+            this.inputCols = inputCols;
+            this.splitsArray = splitsArray;
+            this.handleInvalid = handleInvalid;
+        }
+
+        @Override
+        public void flatMap(Row value, Collector<Row> out) {
+            Row outputRow = new Row(inputCols.length);
+
+            for (int i = 0; i < inputCols.length; i++) {
+                double feature = ((Number) value.getField(inputCols[i])).doubleValue();

Review Comment:
   If the field value is not a Number, the operator would throw exception, which means that `HasHandleInvalid` is always `ERROR` in this case.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.bucketizer;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Bucketizer is a transformer that maps multiple columns of continuous features to multiple columns
+ * of discrete features, i.e., buckets IDs.
+ */
+public class Bucketizer implements Transformer<Bucketizer>, BucketizerParams<Bucketizer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public Bucketizer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        String[] inputCols = getInputCols();
+        String[] outputCols = getOutputCols();
+        Double[][] splitsArray = getSplitsArray();
+
+        Preconditions.checkArgument(inputCols.length == outputCols.length);
+        Preconditions.checkArgument(inputCols.length == splitsArray.length);
+        for (Double[] splits : splitsArray) {
+            Preconditions.checkArgument(
+                    splits.length >= 3,
+                    "Illegal value for "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + ". See param "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + " for details.");
+            for (int j = 1; j < splits.length; j++) {
+                Preconditions.checkArgument(
+                        splits[j] > splits[j - 1],
+                        "Illegal value for "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + ". See param "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + " for details.");
+            }
+        }
+
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        TypeInformation<?>[] outputTypes = new TypeInformation[outputCols.length];
+        Arrays.fill(outputTypes, BasicTypeInfo.INT_TYPE_INFO);
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), outputTypes),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCols()));
+
+        DataStream<Row> result =
+                tEnv.toDataStream(inputs[0])
+                        .flatMap(
+                                new FindBucketFunction(inputCols, splitsArray, getHandleInvalid()),
+                                outputTypeInfo);
+        return new Table[] {tEnv.fromDataStream(result)};
+    }
+
+    /** Finds the bucket index for each continuous feature of an input data point. */
+    private static class FindBucketFunction implements FlatMapFunction<Row, Row> {
+        private final String[] inputCols;
+        private final String handleInvalid;
+        private final Double[][] splitsArray;
+
+        public FindBucketFunction(
+                String[] inputCols, Double[][] splitsArray, String handleInvalid) {
+            this.inputCols = inputCols;
+            this.splitsArray = splitsArray;
+            this.handleInvalid = handleInvalid;
+        }
+
+        @Override
+        public void flatMap(Row value, Collector<Row> out) {
+            Row outputRow = new Row(inputCols.length);
+
+            for (int i = 0; i < inputCols.length; i++) {
+                double feature = ((Number) value.getField(inputCols[i])).doubleValue();
+                Double[] splits = splitsArray[i];
+                boolean isInvalid = false;
+
+                if (!Double.isNaN(feature)) {
+                    int index = Arrays.binarySearch(splits, feature);
+                    if (index >= 0) {
+                        if (index == inputCols.length - 1) {
+                            index--;
+                        }
+                        outputRow.setField(i, index);
+                    } else {
+                        index = -index - 1;
+                        if (index == 0 || index == inputCols.length) {
+                            isInvalid = true;
+                        } else {
+                            outputRow.setField(i, index - 1);
+                        }
+                    }
+                } else {
+                    isInvalid = true;
+                }
+
+                if (isInvalid) {
+                    switch (handleInvalid) {
+                        case BucketizerParams.ERROR_INVALID:
+                            throw new RuntimeException(
+                                    "The input contains invalid value. See "
+                                            + BucketizerParams.HANDLE_INVALID
+                                            + " parameter for more options.");
+                        case BucketizerParams.SKIP_INVALID:
+                            return;
+                        case BucketizerParams.KEEP_INVALID:
+                            outputRow.setField(i, splits.length - 1);
+                            break;
+                        default:
+                            throw new IllegalStateException(
+                                    "Unsupported handleInvalid type: " + handleInvalid);

Review Comment:
   Given that we used `BucketizerParams.SPLITS_ARRAY` in other error messages, shall we also use `BucketizerParams.HANDLE_INVALID` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r845965487


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/OneHotEncoderTest.java:
##########
@@ -205,7 +205,7 @@ public void testInputDataType() {
 
     @Test
     public void testNotSupportedHandleInvalidOptions() {
-        estimator.setHandleInvalid(HasHandleInvalid.SKIP_INVALID);
+        estimator.setHandleInvalid(OneHotEncoderParams.SKIP_INVALID);

Review Comment:
   Thanks for pointing this out. I also think using `HasHandleInvalid` would be better. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r846884071


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.bucketizer;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Bucketizer is a transformer that maps multiple columns of continuous features to multiple columns
+ * of discrete features, i.e., buckets IDs.
+ */
+public class Bucketizer implements Transformer<Bucketizer>, BucketizerParams<Bucketizer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public Bucketizer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        String[] inputCols = getInputCols();
+        String[] outputCols = getOutputCols();
+        Double[][] splitsArray = getSplitsArray();
+
+        Preconditions.checkArgument(inputCols.length == outputCols.length);
+        Preconditions.checkArgument(inputCols.length == splitsArray.length);
+        for (Double[] splits : splitsArray) {
+            Preconditions.checkArgument(
+                    splits.length >= 3,
+                    "Illegal value for "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + ". See param "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + " for details.");
+            for (int j = 1; j < splits.length; j++) {
+                Preconditions.checkArgument(
+                        splits[j] > splits[j - 1],
+                        "Illegal value for "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + ". See param "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + " for details.");
+            }
+        }
+
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        TypeInformation<?>[] outputTypes = new TypeInformation[outputCols.length];
+        Arrays.fill(outputTypes, BasicTypeInfo.INT_TYPE_INFO);
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), outputTypes),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCols()));
+
+        DataStream<Row> result =
+                tEnv.toDataStream(inputs[0])
+                        .flatMap(
+                                new FindBucketFunction(inputCols, splitsArray, getHandleInvalid()),
+                                outputTypeInfo);
+        return new Table[] {tEnv.fromDataStream(result)};
+    }
+
+    /** Finds the bucket index for each continuous feature of an input data point. */
+    private static class FindBucketFunction implements FlatMapFunction<Row, Row> {
+        private final String[] inputCols;
+        private final String handleInvalid;
+        private final Double[][] splitsArray;
+
+        public FindBucketFunction(
+                String[] inputCols, Double[][] splitsArray, String handleInvalid) {
+            this.inputCols = inputCols;
+            this.splitsArray = splitsArray;
+            this.handleInvalid = handleInvalid;
+        }
+
+        @Override
+        public void flatMap(Row value, Collector<Row> out) {
+            Row outputRow = new Row(inputCols.length);
+
+            for (int i = 0; i < inputCols.length; i++) {
+                double feature = ((Number) value.getField(inputCols[i])).doubleValue();
+                Double[] splits = splitsArray[i];
+                boolean isInvalid = false;
+
+                if (!Double.isNaN(feature)) {
+                    int index = Arrays.binarySearch(splits, feature);
+                    if (index >= 0) {
+                        if (index == inputCols.length - 1) {
+                            index--;
+                        }
+                        outputRow.setField(i, index);
+                    } else {
+                        index = -index - 1;
+                        if (index == 0 || index == inputCols.length) {
+                            isInvalid = true;
+                        } else {
+                            outputRow.setField(i, index - 1);
+                        }
+                    }
+                } else {
+                    isInvalid = true;
+                }
+
+                if (isInvalid) {
+                    switch (handleInvalid) {
+                        case BucketizerParams.ERROR_INVALID:
+                            throw new RuntimeException(
+                                    "The input contains invalid value. See "
+                                            + BucketizerParams.HANDLE_INVALID
+                                            + " parameter for more options.");
+                        case BucketizerParams.SKIP_INVALID:
+                            return;
+                        case BucketizerParams.KEEP_INVALID:
+                            outputRow.setField(i, splits.length - 1);
+                            break;

Review Comment:
   For example, If the input have three columns, `[a, b, c]`. `a` and `c` are valid values mapping to `a*` and `c*`, while `b` is an invalid value. In this case the operator should output `[a*, splits.length-1, c*]`, but if the code here is `break`,  the operator would actually output `[a*,  splits.length-1, null]`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
lindong28 commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r849333000


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.bucketizer;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Bucketizer is a transformer that maps multiple columns of continuous features to multiple columns
+ * of discrete features, i.e., buckets IDs.
+ */
+public class Bucketizer implements Transformer<Bucketizer>, BucketizerParams<Bucketizer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public Bucketizer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        String[] inputCols = getInputCols();
+        String[] outputCols = getOutputCols();
+        Double[][] splitsArray = getSplitsArray();
+
+        Preconditions.checkArgument(inputCols.length == outputCols.length);
+        Preconditions.checkArgument(inputCols.length == splitsArray.length);
+        for (Double[] splits : splitsArray) {
+            Preconditions.checkArgument(
+                    splits.length >= 3,
+                    "Illegal value for "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + ". See param "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + " for details.");
+            for (int j = 1; j < splits.length; j++) {
+                Preconditions.checkArgument(
+                        splits[j] > splits[j - 1],
+                        "Illegal value for "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + ". See param "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + " for details.");
+            }
+        }
+
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        TypeInformation<?>[] outputTypes = new TypeInformation[outputCols.length];
+        Arrays.fill(outputTypes, BasicTypeInfo.INT_TYPE_INFO);
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), outputTypes),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCols()));
+
+        DataStream<Row> result =
+                tEnv.toDataStream(inputs[0])
+                        .flatMap(
+                                new FindBucketFunction(inputCols, splitsArray, getHandleInvalid()),
+                                outputTypeInfo);
+        return new Table[] {tEnv.fromDataStream(result)};
+    }
+
+    /** Finds the bucket index for each continuous feature of an input data point. */
+    private static class FindBucketFunction implements FlatMapFunction<Row, Row> {
+        private final String[] inputCols;
+        private final String handleInvalid;
+        private final Double[][] splitsArray;
+
+        public FindBucketFunction(
+                String[] inputCols, Double[][] splitsArray, String handleInvalid) {
+            this.inputCols = inputCols;
+            this.splitsArray = splitsArray;
+            this.handleInvalid = handleInvalid;
+        }
+
+        @Override
+        public void flatMap(Row value, Collector<Row> out) {
+            Row outputRow = new Row(inputCols.length);
+
+            for (int i = 0; i < inputCols.length; i++) {
+                double feature = ((Number) value.getField(inputCols[i])).doubleValue();
+                Double[] splits = splitsArray[i];
+                boolean isInvalid = false;
+
+                if (!Double.isNaN(feature)) {
+                    int index = Arrays.binarySearch(splits, feature);
+                    if (index >= 0) {
+                        if (index == inputCols.length - 1) {
+                            index--;
+                        }
+                        outputRow.setField(i, index);
+                    } else {
+                        index = -index - 1;
+                        if (index == 0 || index == inputCols.length) {
+                            isInvalid = true;
+                        } else {
+                            outputRow.setField(i, index - 1);
+                        }
+                    }
+                } else {
+                    isInvalid = true;
+                }
+
+                if (isInvalid) {
+                    switch (handleInvalid) {
+                        case BucketizerParams.ERROR_INVALID:
+                            throw new RuntimeException(
+                                    "The input contains invalid value. See "
+                                            + BucketizerParams.HANDLE_INVALID
+                                            + " parameter for more options.");
+                        case BucketizerParams.SKIP_INVALID:
+                            return;
+                        case BucketizerParams.KEEP_INVALID:
+                            outputRow.setField(i, splits.length - 1);
+                            break;

Review Comment:
   @yunfengzhou-hub by the `break` here, the code breaks out of the switch loop, and the for loop continues. So in the case described above, the current implementation would still output `[a*, splits.length-1, c*]`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
lindong28 commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r849438742


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/ArrayArrayParam.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.param;
+
+import org.apache.flink.ml.util.ReadWriteUtils;
+
+import java.io.IOException;
+
+/** Class for the arrayOfArray-type parameters. */
+public class ArrayArrayParam<T> extends Param<T[][]> {

Review Comment:
   I am not sure the alternative approach simplify the overall internal implementation.
   
   The alternative approach does remove the need for DoubleArrayArrayParam. But for every parameter of type `double[][]`, it requires the setXXX() and getXXX() to have non-trivial implementation as describe above. We can add two public static methods to be shared by those setXXX() and getXXX(). But the addition of these two methods do not seem to be simpler than the existing `DoubleArrayArrayParam`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r845960833


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/ArrayArrayParam.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.param;
+
+import org.apache.flink.ml.util.ReadWriteUtils;
+
+import java.io.IOException;
+
+/** Class for the arrayOfArray-type parameters. */
+public class ArrayArrayParam<T> extends Param<T[][]> {

Review Comment:
   We probably follow the user behavior here. 
   
   In Bucketizer, users specify a split array for each input col. Moreover, the number of split points in different columns could be different. If we use 1D array to represent 2D array, we would also need to specify the dimension for each 1D array. This seems a bit complex to me.
   
   Also, using 2D array is consistent with exisiting libraries [1].
   
   [1] https://github.com/apache/spark/blob/1018f8b545a74f4fcaf123b5f15d52ac2afa2d3c/mllib/src/main/scala/org/apache/spark/ml/param/params.scala#L567



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r845962894


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/onehotencoder/OneHotEncoderParams.java:
##########
@@ -27,6 +27,8 @@
 /**
  * Params of OneHotEncoderModel.
  *
+ * <p>The `keep` option of {@link HasHandleInvalid} is not supported in {@link OneHotEncoderParams}.

Review Comment:
   Yep. I added the comment here because we would support `SKIP` in the near future (I guess), but will never support `keep`.
   
   Are you suggesting that we also mention that `SKIP` is not supported and remove it later in the near future?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r846885979


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorAssemblerTest.java:
##########
@@ -141,10 +140,11 @@ public void testErrorInvalid() {
             Table outputTable = vectorAssembler.transform(inputDataTable)[0];
             outputTable.execute().collect().next();
             Assert.fail("Expected IllegalArgumentException");
-        } catch (Exception e) {
-            assertEquals(
-                    "Input column value should not be null.",
-                    e.getCause().getCause().getCause().getCause().getCause().getMessage());
+        } catch (Throwable e) {
+            while (e.getCause() != null) {
+                e = e.getCause();
+            }
+            assertEquals("Input column value should not be null.", e.getMessage());

Review Comment:
   Just found that we can directly use `org.apache.commons.lang3.exception.ExceptionUtils.getRootCauseMessage()` for such cases.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/BucketizerParams.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.bucketizer;
+
+import org.apache.flink.ml.common.param.HasHandleInvalid;
+import org.apache.flink.ml.common.param.HasInputCols;
+import org.apache.flink.ml.common.param.HasOutputCols;
+import org.apache.flink.ml.param.DoubleArrayArrayParam;
+import org.apache.flink.ml.param.ParamValidator;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Params for {@link Bucketizer}.
+ *
+ * <p>The `keep` option of {@link HasHandleInvalid} means that we put the invalid data in the last
+ * bucket of the splits, whose index is the number of the buckets.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface BucketizerParams<T>
+        extends HasInputCols<T>, HasOutputCols<T>, HasHandleInvalid<T> {
+    /**
+     * The array of split points for mapping continuous features into buckets for multiple columns.
+     *
+     * <p>Each input column is supposed to be mapped into {numberOfSplitPoints - 1} buckets. A
+     * bucket is defined by two split points. For example, bucket(x,y) contains values in the range
+     * [x,y). An exception is that the last bucket also contains y. The array should contain at
+     * least three split points and be strictly increasing.
+     */
+    DoubleArrayArrayParam SPLITS_ARRAY =
+            new DoubleArrayArrayParam(
+                    "splitsArray",
+                    "Array of split points for mapping continuous features into buckets.",
+                    null,
+                    new SplitsArrayValidator());
+
+    default Double[][] getSplitsArray() {
+        return get(SPLITS_ARRAY);
+    }
+
+    default T setSplitsArray(Double[][] value) {
+        set(SPLITS_ARRAY, value);
+        return (T) this;
+    }
+
+    /** Param validator for splitsArray. */
+    class SplitsArrayValidator implements ParamValidator<Double[][]> {
+
+        @Override
+        public boolean validate(Double[][] splitsArray) {
+            Preconditions.checkNotNull(splitsArray);
+            Preconditions.checkArgument(splitsArray.length != 0);
+            for (Double[] splits : splitsArray) {
+                Preconditions.checkArgument(splits.length >= 3);
+                for (int j = 1; j < splits.length; j++) {
+                    Preconditions.checkArgument(splits[j] > splits[j - 1]);

Review Comment:
   Shall we return false so that the external infra can handle it, instead of directly throw exception?
   ```suggestion
   if (splits[j] <= splits[j-1]) {
       return false;
   }
   ```



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.bucketizer;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Bucketizer is a transformer that maps multiple columns of continuous features to multiple columns
+ * of discrete features, i.e., buckets IDs.
+ */
+public class Bucketizer implements Transformer<Bucketizer>, BucketizerParams<Bucketizer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public Bucketizer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        String[] inputCols = getInputCols();
+        String[] outputCols = getOutputCols();
+        Double[][] splitsArray = getSplitsArray();
+        Preconditions.checkArgument(inputCols.length == outputCols.length);
+        Preconditions.checkArgument(inputCols.length == splitsArray.length);
+
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        TypeInformation<?>[] outputTypes = new TypeInformation[outputCols.length];
+        Arrays.fill(outputTypes, BasicTypeInfo.INT_TYPE_INFO);
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), outputTypes),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCols()));
+
+        DataStream<Row> result =
+                tEnv.toDataStream(inputs[0])
+                        .flatMap(
+                                new FindBucketFunction(inputCols, splitsArray, getHandleInvalid()),
+                                outputTypeInfo);
+        return new Table[] {tEnv.fromDataStream(result)};
+    }
+
+    /** Finds the bucket index for each continuous feature of an input data point. */
+    private static class FindBucketFunction implements FlatMapFunction<Row, Row> {
+        private final String[] inputCols;
+        private final String handleInvalid;
+        private final Double[][] splitsArray;
+
+        public FindBucketFunction(
+                String[] inputCols, Double[][] splitsArray, String handleInvalid) {
+            this.inputCols = inputCols;
+            this.splitsArray = splitsArray;
+            this.handleInvalid = handleInvalid;
+        }
+
+        @Override
+        public void flatMap(Row value, Collector<Row> out) {
+            Row outputRow = new Row(inputCols.length);
+
+            for (int i = 0; i < inputCols.length; i++) {
+                double feature = ((Number) value.getField(inputCols[i])).doubleValue();
+                Double[] splits = splitsArray[i];
+                boolean isInvalid = false;
+
+                if (!Double.isNaN(feature)) {
+                    int index = Arrays.binarySearch(splits, feature);
+                    if (index >= 0) {
+                        if (index == inputCols.length - 1) {
+                            index--;
+                        }
+                        outputRow.setField(i, index);
+                    } else {
+                        index = -index - 1;
+                        if (index == 0 || index == inputCols.length) {
+                            isInvalid = true;
+                        } else {
+                            outputRow.setField(i, index - 1);
+                        }
+                    }
+                } else {
+                    isInvalid = true;
+                }
+
+                if (isInvalid) {
+                    switch (handleInvalid) {
+                        case ERROR_INVALID:
+                            throw new RuntimeException(
+                                    "The input contains invalid value. See "
+                                            + HANDLE_INVALID
+                                            + " parameter for more options.");
+                        case SKIP_INVALID:
+                            return;
+                        case KEEP_INVALID:
+                            outputRow.setField(i, splits.length - 1);
+                            break;
+                        default:
+                            throw new IllegalStateException(

Review Comment:
   nit: `UnsupportedOperationException`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r846890706


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/ArrayArrayParam.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.param;
+
+import org.apache.flink.ml.util.ReadWriteUtils;
+
+import java.io.IOException;
+
+/** Class for the arrayOfArray-type parameters. */
+public class ArrayArrayParam<T> extends Param<T[][]> {

Review Comment:
   > we can store the 2D array in a 1D format
   
   I am a bit confused about this. How are we going to store the shape of each dimension? 
   Maintaining another `array` for shapes seems a bit complex to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r845984035


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.bucketizer;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Bucketizer is a transformer that maps multiple columns of continuous features to multiple columns
+ * of discrete features, i.e., buckets IDs.
+ */
+public class Bucketizer implements Transformer<Bucketizer>, BucketizerParams<Bucketizer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public Bucketizer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        String[] inputCols = getInputCols();
+        String[] outputCols = getOutputCols();
+        Double[][] splitsArray = getSplitsArray();
+
+        Preconditions.checkArgument(inputCols.length == outputCols.length);
+        Preconditions.checkArgument(inputCols.length == splitsArray.length);
+        for (Double[] splits : splitsArray) {
+            Preconditions.checkArgument(
+                    splits.length >= 3,
+                    "Illegal value for "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + ". See param "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + " for details.");
+            for (int j = 1; j < splits.length; j++) {
+                Preconditions.checkArgument(
+                        splits[j] > splits[j - 1],
+                        "Illegal value for "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + ". See param "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + " for details.");
+            }
+        }

Review Comment:
   Good point. I have move the checks to `BucketizerParams`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 closed pull request #82: [FLINK-27072] Add Transformer for Bucketizer

Posted by GitBox <gi...@apache.org>.
lindong28 closed pull request #82: [FLINK-27072] Add Transformer for Bucketizer
URL: https://github.com/apache/flink-ml/pull/82


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r846892103


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.bucketizer;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Bucketizer is a transformer that maps multiple columns of continuous features to multiple columns
+ * of discrete features, i.e., buckets IDs.
+ */
+public class Bucketizer implements Transformer<Bucketizer>, BucketizerParams<Bucketizer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public Bucketizer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        String[] inputCols = getInputCols();
+        String[] outputCols = getOutputCols();
+        Double[][] splitsArray = getSplitsArray();
+
+        Preconditions.checkArgument(inputCols.length == outputCols.length);
+        Preconditions.checkArgument(inputCols.length == splitsArray.length);
+        for (Double[] splits : splitsArray) {
+            Preconditions.checkArgument(
+                    splits.length >= 3,
+                    "Illegal value for "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + ". See param "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + " for details.");
+            for (int j = 1; j < splits.length; j++) {
+                Preconditions.checkArgument(
+                        splits[j] > splits[j - 1],
+                        "Illegal value for "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + ". See param "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + " for details.");
+            }
+        }
+
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        TypeInformation<?>[] outputTypes = new TypeInformation[outputCols.length];
+        Arrays.fill(outputTypes, BasicTypeInfo.INT_TYPE_INFO);
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), outputTypes),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCols()));
+
+        DataStream<Row> result =
+                tEnv.toDataStream(inputs[0])
+                        .flatMap(
+                                new FindBucketFunction(inputCols, splitsArray, getHandleInvalid()),
+                                outputTypeInfo);
+        return new Table[] {tEnv.fromDataStream(result)};
+    }
+
+    /** Finds the bucket index for each continuous feature of an input data point. */
+    private static class FindBucketFunction implements FlatMapFunction<Row, Row> {
+        private final String[] inputCols;
+        private final String handleInvalid;
+        private final Double[][] splitsArray;
+
+        public FindBucketFunction(
+                String[] inputCols, Double[][] splitsArray, String handleInvalid) {
+            this.inputCols = inputCols;
+            this.splitsArray = splitsArray;
+            this.handleInvalid = handleInvalid;
+        }
+
+        @Override
+        public void flatMap(Row value, Collector<Row> out) {
+            Row outputRow = new Row(inputCols.length);
+
+            for (int i = 0; i < inputCols.length; i++) {
+                double feature = ((Number) value.getField(inputCols[i])).doubleValue();

Review Comment:
   The behavior of `non-number input` is different from the `ERROR` option.
   - When using a non-number input, it throws a `ClassCastException`
   - When it is a number or NaN, it throws a runtime exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r846975117


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/ArrayArrayParam.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.param;
+
+import org.apache.flink.ml.util.ReadWriteUtils;
+
+import java.io.IOException;
+
+/** Class for the arrayOfArray-type parameters. */
+public class ArrayArrayParam<T> extends Param<T[][]> {

Review Comment:
   How about this?
   ```java
       DoubleArrayParam SPLITS_ARRAY =
               new DoubleArrayParam(
                       "splitsArray", "Array of split points.", null, ParamValidators.nonEmptyArray());
   
       default Double[][] getSplitsArray() {
           Double[] values = get(SPLITS_ARRAY);
           Double[][] result = new Double[values[0].intValue()][];
           int offset = 1;
           for (int i = 0; i < values[0].intValue(); i++) {
               result[i] = new Double[values[offset].intValue()];
               System.arraycopy(values, offset + 1, result[i], 0, values[offset].intValue());
               offset += values[0].intValue() + 1;
           }
           return result;
       }
   
       default T setSplitsArray(Double[][] value) {
           List<Double> list = new ArrayList<>();
           list.add((double) value.length);
           for (Double[] doubles : value) {
               list.add((double) doubles.length);
               list.addAll(Arrays.asList(doubles));
           }
           set(SPLITS_ARRAY, list.toArray(new Double[0]));
           return (T) this;
       }
   ```
   
   My point is that we should try to simplify the internal implementation. Currently we have about 6 basic type params, including String, Boolean, Int, Long, Double, Float. For array-typed parameters, we introduced another 6 classes. For 2D-array typed parameters, we would also need 6 classes. Same for 3D, 4D cases. I don't think it would be a good idea to have so many classes for parameters, each of which shares a large proportion in common. For this problem, the solution I proposed in comments above is to reuse 1D classes for arrays of larger dimensions. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
lindong28 commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r849347711


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/ArrayArrayParam.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.param;
+
+import org.apache.flink.ml.util.ReadWriteUtils;
+
+import java.io.IOException;
+
+/** Class for the arrayOfArray-type parameters. */

Review Comment:
   nits: it is rare to use `arrayOfArray` in comments. How about `Class for the array parameters`, which seems to be more consistent with the comments of most XXXParam classes.
   
   And how about we change the comment of `ArrayParam` to `Class for the array parameters` for consistency?
   
   



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasHandleInvalid.java:
##########
@@ -32,18 +32,21 @@
  * <ul>
  *   <li>error: raise an exception.
  *   <li>skip: filter out rows with bad values.
+ *   <li>keep: keep bad rows according to a specific rule. Check out each algorithm for detail

Review Comment:
   nits: `for detail rule` does not seem readable. How about `for details`?



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/onehotencoder/OneHotEncoderParams.java:
##########
@@ -27,6 +27,9 @@
 /**
  * Params of OneHotEncoderModel.
  *
+ * <p>The `keep` and `skip` option of {@link HasHandleInvalid} is not supported in {@link

Review Comment:
   Most users will checkout the Java doc of the stage classes instead of the XXXParam classes, such as `OneHotEncoder` and `OneHotEncoderModel`, to understand how to use these algorithms. Would it be more user-friendly to move this explanation to the Java doc of `OneHotEncoder` and `OneHotEncoderModel`?
   
   Same for other algorithms.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.bucketizer;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Bucketizer is a transformer that maps multiple columns of continuous features to multiple columns
+ * of discrete features, i.e., buckets IDs.

Review Comment:
   Would it be useful to replace `buckets IDs` with `bucket indices`, and mention that `the indices are in [0, numSplitsInThisColumn - 1]`, to provide more information regarding the output of this class?
   
   This would also make the comment more consistent with that of `StringIndexer`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r850118629


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/onehotencoder/OneHotEncoderParams.java:
##########
@@ -27,6 +27,9 @@
 /**
  * Params of OneHotEncoderModel.
  *
+ * <p>The `keep` and `skip` option of {@link HasHandleInvalid} is not supported in {@link

Review Comment:
   Thanks for the comment. I prefer putting the description in XXXParam class for the following two reaons:
   
   - Putting the description to concrete classes may lead to duplicate comments. For example, `OneHotEncoder` and `OneHotEncoderModel`.
   - I don't really think checking Java doc for XXXParams is a bad/not-user-friendly behavior. They are public interfaces by the way. For documents on FlinkML website, we can put all Java docs related to a certain algorithm together.
   
   
   What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r845968866


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.bucketizer;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Bucketizer is a transformer that maps multiple columns of continuous features to multiple columns
+ * of discrete features, i.e., buckets IDs.
+ */
+public class Bucketizer implements Transformer<Bucketizer>, BucketizerParams<Bucketizer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public Bucketizer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        String[] inputCols = getInputCols();
+        String[] outputCols = getOutputCols();
+        Double[][] splitsArray = getSplitsArray();
+
+        Preconditions.checkArgument(inputCols.length == outputCols.length);
+        Preconditions.checkArgument(inputCols.length == splitsArray.length);
+        for (Double[] splits : splitsArray) {
+            Preconditions.checkArgument(
+                    splits.length >= 3,
+                    "Illegal value for "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + ". See param "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + " for details.");
+            for (int j = 1; j < splits.length; j++) {
+                Preconditions.checkArgument(
+                        splits[j] > splits[j - 1],
+                        "Illegal value for "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + ". See param "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + " for details.");
+            }
+        }
+
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        TypeInformation<?>[] outputTypes = new TypeInformation[outputCols.length];
+        Arrays.fill(outputTypes, BasicTypeInfo.INT_TYPE_INFO);
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), outputTypes),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCols()));
+
+        DataStream<Row> result =
+                tEnv.toDataStream(inputs[0])
+                        .flatMap(
+                                new FindBucketFunction(inputCols, splitsArray, getHandleInvalid()),
+                                outputTypeInfo);
+        return new Table[] {tEnv.fromDataStream(result)};
+    }
+
+    /** Finds the bucket index for each continuous feature of an input data point. */
+    private static class FindBucketFunction implements FlatMapFunction<Row, Row> {
+        private final String[] inputCols;
+        private final String handleInvalid;
+        private final Double[][] splitsArray;
+
+        public FindBucketFunction(
+                String[] inputCols, Double[][] splitsArray, String handleInvalid) {
+            this.inputCols = inputCols;
+            this.splitsArray = splitsArray;
+            this.handleInvalid = handleInvalid;
+        }
+
+        @Override
+        public void flatMap(Row value, Collector<Row> out) {
+            Row outputRow = new Row(inputCols.length);
+
+            for (int i = 0; i < inputCols.length; i++) {
+                double feature = ((Number) value.getField(inputCols[i])).doubleValue();

Review Comment:
   `Bucketizer` has assumed that the input is always a `Number`. I think the default thrown exception is enough for debugging.
   
   The `ERROR` is for two cases:
   - `NaN` 
   - The input values that do not fit in the given split arrays.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r845977149


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.bucketizer;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Bucketizer is a transformer that maps multiple columns of continuous features to multiple columns
+ * of discrete features, i.e., buckets IDs.
+ */
+public class Bucketizer implements Transformer<Bucketizer>, BucketizerParams<Bucketizer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public Bucketizer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        String[] inputCols = getInputCols();
+        String[] outputCols = getOutputCols();
+        Double[][] splitsArray = getSplitsArray();
+
+        Preconditions.checkArgument(inputCols.length == outputCols.length);
+        Preconditions.checkArgument(inputCols.length == splitsArray.length);
+        for (Double[] splits : splitsArray) {
+            Preconditions.checkArgument(
+                    splits.length >= 3,
+                    "Illegal value for "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + ". See param "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + " for details.");
+            for (int j = 1; j < splits.length; j++) {
+                Preconditions.checkArgument(
+                        splits[j] > splits[j - 1],
+                        "Illegal value for "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + ". See param "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + " for details.");
+            }
+        }
+
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        TypeInformation<?>[] outputTypes = new TypeInformation[outputCols.length];
+        Arrays.fill(outputTypes, BasicTypeInfo.INT_TYPE_INFO);
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), outputTypes),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCols()));
+
+        DataStream<Row> result =
+                tEnv.toDataStream(inputs[0])
+                        .flatMap(
+                                new FindBucketFunction(inputCols, splitsArray, getHandleInvalid()),
+                                outputTypeInfo);
+        return new Table[] {tEnv.fromDataStream(result)};
+    }
+
+    /** Finds the bucket index for each continuous feature of an input data point. */
+    private static class FindBucketFunction implements FlatMapFunction<Row, Row> {
+        private final String[] inputCols;
+        private final String handleInvalid;
+        private final Double[][] splitsArray;
+
+        public FindBucketFunction(
+                String[] inputCols, Double[][] splitsArray, String handleInvalid) {
+            this.inputCols = inputCols;
+            this.splitsArray = splitsArray;
+            this.handleInvalid = handleInvalid;
+        }
+
+        @Override
+        public void flatMap(Row value, Collector<Row> out) {
+            Row outputRow = new Row(inputCols.length);
+
+            for (int i = 0; i < inputCols.length; i++) {
+                double feature = ((Number) value.getField(inputCols[i])).doubleValue();
+                Double[] splits = splitsArray[i];
+                boolean isInvalid = false;
+
+                if (!Double.isNaN(feature)) {
+                    int index = Arrays.binarySearch(splits, feature);
+                    if (index >= 0) {
+                        if (index == inputCols.length - 1) {
+                            index--;
+                        }
+                        outputRow.setField(i, index);
+                    } else {
+                        index = -index - 1;
+                        if (index == 0 || index == inputCols.length) {
+                            isInvalid = true;
+                        } else {
+                            outputRow.setField(i, index - 1);
+                        }
+                    }
+                } else {
+                    isInvalid = true;
+                }
+
+                if (isInvalid) {
+                    switch (handleInvalid) {
+                        case BucketizerParams.ERROR_INVALID:
+                            throw new RuntimeException(
+                                    "The input contains invalid value. See "
+                                            + BucketizerParams.HANDLE_INVALID
+                                            + " parameter for more options.");
+                        case BucketizerParams.SKIP_INVALID:
+                            return;
+                        case BucketizerParams.KEEP_INVALID:
+                            outputRow.setField(i, splits.length - 1);
+                            break;

Review Comment:
   Hmm, could you explain a bit more about this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r846882100


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/ArrayArrayParam.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.param;
+
+import org.apache.flink.ml.util.ReadWriteUtils;
+
+import java.io.IOException;
+
+/** Class for the arrayOfArray-type parameters. */
+public class ArrayArrayParam<T> extends Param<T[][]> {

Review Comment:
   I agree with it that on API we need to display getter/setter for 2D arrays, but as for internal implementation, I believe we can store the 2D array in a 1D format. This does not affect user experience and can reuse existing `ArrayParams`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r846977954


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/BucketizerTest.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.common.param.HasHandleInvalid;
+import org.apache.flink.ml.feature.bucketizer.Bucketizer;
+import org.apache.flink.ml.util.StageTestUtils;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/** Tests the {@link Bucketizer}. */
+public class BucketizerTest extends AbstractTestBase {
+    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+    private StreamTableEnvironment tEnv;
+    private Table inputTable;
+
+    private static final List<Row> inputData =
+            Arrays.asList(
+                    Row.of(1, -0.5, 0.0, 1.0),
+                    Row.of(2, Double.NEGATIVE_INFINITY, 1.0, Double.POSITIVE_INFINITY),
+                    Row.of(3, Double.NaN, -0.5, -0.5));
+
+    private static final Double[][] splitsArray =
+            new Double[][] {
+                new Double[] {-0.5, 0.0, 0.5},
+                new Double[] {-1.0, 0.0, 2.0},
+                new Double[] {Double.NEGATIVE_INFINITY, 10.0, Double.POSITIVE_INFINITY}
+            };
+
+    private final List<Row> expectedKeepResult =
+            Arrays.asList(Row.of(1, 0, 1, 0), Row.of(2, 2, 1, 1), Row.of(3, 2, 0, 0));
+
+    private final List<Row> expectedSkipResult = Collections.singletonList(Row.of(1, 0, 1, 0));
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        inputTable = tEnv.fromDataStream(env.fromCollection(inputData)).as("id", "f1", "f2", "f3");
+    }
+
+    @SuppressWarnings("all")
+    private void verifyOutputResult(Table output, String[] outputCols, List<Row> expectedResult)
+            throws Exception {
+        List<Row> collectedResult =
+                IteratorUtils.toList(tEnv.toDataStream(output).executeAndCollect());
+        List<Row> result = new ArrayList<>(collectedResult.size());
+
+        for (Row r : collectedResult) {
+            Row outRow = new Row(outputCols.length + 1);
+            outRow.setField(0, r.getField("id"));
+            for (int i = 0; i < outputCols.length; i++) {
+                outRow.setField(i + 1, r.getField(outputCols[i]));
+            }
+            result.add(outRow);
+        }
+
+        compareResultCollections(
+                expectedResult, result, Comparator.comparingInt(r -> ((Integer) r.getField(0))));
+    }
+
+    @Test
+    public void testParam() {
+        Bucketizer bucketizer = new Bucketizer();
+        assertEquals(HasHandleInvalid.ERROR_INVALID, bucketizer.getHandleInvalid());
+
+        bucketizer
+                .setInputCols("f1", "f2", "f3")
+                .setOutputCols("o1", "o2", "o3")
+                .setHandleInvalid(HasHandleInvalid.SKIP_INVALID)
+                .setSplitsArray(splitsArray);

Review Comment:
   @lindong28 How do you like the idea to add APIs to `Bucketizer` so that users can define metadata in the following format?
   ```java
   bucketizer
       .addCol("f1", "o1", splitsArray[0])
       .addCol("f2", "o2", splitsArray[1]);
   ```
   It is different from existing practice from Spark or Alink, but the readability would better. It would also be OK for me if it is better to leave this discussion to future and to focus on functionality correctness for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on pull request #82: [FLINK-27072] Add Transformer for Bucketizer

Posted by GitBox <gi...@apache.org>.
lindong28 commented on PR #82:
URL: https://github.com/apache/flink-ml/pull/82#issuecomment-1099110863

   Merged to the master branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r850118629


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/onehotencoder/OneHotEncoderParams.java:
##########
@@ -27,6 +27,9 @@
 /**
  * Params of OneHotEncoderModel.
  *
+ * <p>The `keep` and `skip` option of {@link HasHandleInvalid} is not supported in {@link

Review Comment:
   Sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
lindong28 commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r849335280


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/BucketizerTest.java:
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.common.param.HasHandleInvalid;
+import org.apache.flink.ml.feature.bucketizer.Bucketizer;
+import org.apache.flink.ml.util.StageTestUtils;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/** Tests the {@link Bucketizer}. */
+public class BucketizerTest extends AbstractTestBase {
+    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+    private StreamTableEnvironment tEnv;
+    private Table inputTable;
+
+    private static final List<Row> inputData =
+            Arrays.asList(
+                    Row.of(1, -0.5, 0.0, 1.0),
+                    Row.of(2, Double.NEGATIVE_INFINITY, 1.0, Double.POSITIVE_INFINITY),
+                    Row.of(3, Double.NaN, -0.5, -0.5));
+
+    private static final Double[][] splitsArray =
+            new Double[][] {
+                new Double[] {-0.5, 0.0, 0.5},
+                new Double[] {-1.0, 0.0, 2.0},
+                new Double[] {Double.NEGATIVE_INFINITY, 10.0, Double.POSITIVE_INFINITY}
+            };
+
+    private final List<Row> expectedKeepResult =
+            Arrays.asList(Row.of(1, 0, 1, 0), Row.of(2, 2, 1, 1), Row.of(3, 2, 0, 0));
+
+    private final List<Row> expectedSkipResult = Collections.singletonList(Row.of(1, 0, 1, 0));
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        inputTable = tEnv.fromDataStream(env.fromCollection(inputData)).as("id", "f1", "f2", "f3");
+    }
+
+    @SuppressWarnings("all")
+    private void verifyOutputResult(Table output, String[] outputCols, List<Row> expectedResult)
+            throws Exception {
+        List<Row> collectedResult =
+                IteratorUtils.toList(tEnv.toDataStream(output).executeAndCollect());
+        List<Row> result = new ArrayList<>(collectedResult.size());
+
+        for (Row r : collectedResult) {
+            Row outRow = new Row(outputCols.length + 1);
+            outRow.setField(0, r.getField("id"));
+            for (int i = 0; i < outputCols.length; i++) {
+                outRow.setField(i + 1, r.getField(outputCols[i]));
+            }
+            result.add(outRow);
+        }
+
+        compareResultCollections(
+                expectedResult, result, Comparator.comparingInt(r -> ((Integer) r.getField(0))));
+    }
+
+    @Test
+    public void testParam() {
+        Bucketizer bucketizer = new Bucketizer();
+        assertEquals(HasHandleInvalid.ERROR_INVALID, bucketizer.getHandleInvalid());
+
+        bucketizer
+                .setInputCols("f1", "f2", "f3")
+                .setOutputCols("o1", "o2", "o3")
+                .setHandleInvalid(HasHandleInvalid.SKIP_INVALID)
+                .setSplitsArray(splitsArray);

Review Comment:
   It seems better keep the current approach so that the user-experience with `setInputCols(..)` names is more consistent with `setInputCol(...)`.
   
   IMO, it is not clear that the readability with the alternative approach is better than the current approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r846883248


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.bucketizer;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Bucketizer is a transformer that maps multiple columns of continuous features to multiple columns
+ * of discrete features, i.e., buckets IDs.
+ */
+public class Bucketizer implements Transformer<Bucketizer>, BucketizerParams<Bucketizer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public Bucketizer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        String[] inputCols = getInputCols();
+        String[] outputCols = getOutputCols();
+        Double[][] splitsArray = getSplitsArray();
+
+        Preconditions.checkArgument(inputCols.length == outputCols.length);
+        Preconditions.checkArgument(inputCols.length == splitsArray.length);
+        for (Double[] splits : splitsArray) {
+            Preconditions.checkArgument(
+                    splits.length >= 3,
+                    "Illegal value for "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + ". See param "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + " for details.");
+            for (int j = 1; j < splits.length; j++) {
+                Preconditions.checkArgument(
+                        splits[j] > splits[j - 1],
+                        "Illegal value for "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + ". See param "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + " for details.");
+            }
+        }
+
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        TypeInformation<?>[] outputTypes = new TypeInformation[outputCols.length];
+        Arrays.fill(outputTypes, BasicTypeInfo.INT_TYPE_INFO);
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), outputTypes),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCols()));
+
+        DataStream<Row> result =
+                tEnv.toDataStream(inputs[0])
+                        .flatMap(
+                                new FindBucketFunction(inputCols, splitsArray, getHandleInvalid()),
+                                outputTypeInfo);
+        return new Table[] {tEnv.fromDataStream(result)};
+    }
+
+    /** Finds the bucket index for each continuous feature of an input data point. */
+    private static class FindBucketFunction implements FlatMapFunction<Row, Row> {
+        private final String[] inputCols;
+        private final String handleInvalid;
+        private final Double[][] splitsArray;
+
+        public FindBucketFunction(
+                String[] inputCols, Double[][] splitsArray, String handleInvalid) {
+            this.inputCols = inputCols;
+            this.splitsArray = splitsArray;
+            this.handleInvalid = handleInvalid;
+        }
+
+        @Override
+        public void flatMap(Row value, Collector<Row> out) {
+            Row outputRow = new Row(inputCols.length);
+
+            for (int i = 0; i < inputCols.length; i++) {
+                double feature = ((Number) value.getField(inputCols[i])).doubleValue();

Review Comment:
   I mean if we set `handleInvalid` as `SKIP` or `KEEP`, the invalid non-number input would still cause the operator to behave as if `handleInvalid` is `ERROR`. Is this the expected behavior?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r846892103


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/bucketizer/Bucketizer.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.bucketizer;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Bucketizer is a transformer that maps multiple columns of continuous features to multiple columns
+ * of discrete features, i.e., buckets IDs.
+ */
+public class Bucketizer implements Transformer<Bucketizer>, BucketizerParams<Bucketizer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public Bucketizer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        String[] inputCols = getInputCols();
+        String[] outputCols = getOutputCols();
+        Double[][] splitsArray = getSplitsArray();
+
+        Preconditions.checkArgument(inputCols.length == outputCols.length);
+        Preconditions.checkArgument(inputCols.length == splitsArray.length);
+        for (Double[] splits : splitsArray) {
+            Preconditions.checkArgument(
+                    splits.length >= 3,
+                    "Illegal value for "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + ". See param "
+                            + BucketizerParams.SPLITS_ARRAY
+                            + " for details.");
+            for (int j = 1; j < splits.length; j++) {
+                Preconditions.checkArgument(
+                        splits[j] > splits[j - 1],
+                        "Illegal value for "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + ". See param "
+                                + BucketizerParams.SPLITS_ARRAY
+                                + " for details.");
+            }
+        }
+
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        TypeInformation<?>[] outputTypes = new TypeInformation[outputCols.length];
+        Arrays.fill(outputTypes, BasicTypeInfo.INT_TYPE_INFO);
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), outputTypes),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCols()));
+
+        DataStream<Row> result =
+                tEnv.toDataStream(inputs[0])
+                        .flatMap(
+                                new FindBucketFunction(inputCols, splitsArray, getHandleInvalid()),
+                                outputTypeInfo);
+        return new Table[] {tEnv.fromDataStream(result)};
+    }
+
+    /** Finds the bucket index for each continuous feature of an input data point. */
+    private static class FindBucketFunction implements FlatMapFunction<Row, Row> {
+        private final String[] inputCols;
+        private final String handleInvalid;
+        private final Double[][] splitsArray;
+
+        public FindBucketFunction(
+                String[] inputCols, Double[][] splitsArray, String handleInvalid) {
+            this.inputCols = inputCols;
+            this.splitsArray = splitsArray;
+            this.handleInvalid = handleInvalid;
+        }
+
+        @Override
+        public void flatMap(Row value, Collector<Row> out) {
+            Row outputRow = new Row(inputCols.length);
+
+            for (int i = 0; i < inputCols.length; i++) {
+                double feature = ((Number) value.getField(inputCols[i])).doubleValue();

Review Comment:
   The behavior of `non-number input` is different from the `ERROR` option.
   - When using a non-number input, it throws a cast exception
   - When it is a number or NaN, it throws a runtime exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #82: [FLINK-27072] Add Transformer of Bucketizer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #82:
URL: https://github.com/apache/flink-ml/pull/82#discussion_r845960833


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/ArrayArrayParam.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.param;
+
+import org.apache.flink.ml.util.ReadWriteUtils;
+
+import java.io.IOException;
+
+/** Class for the arrayOfArray-type parameters. */
+public class ArrayArrayParam<T> extends Param<T[][]> {

Review Comment:
   We probably should follow the user behavior here. 
   
   In Bucketizer, users specify a split array for each input col. Moreover, the number of split points in different columns could be different. If we use 1D array to represent 2D array, we would also need to specify the dimension for each 1D array. This seems a bit complex to me.
   
   Also, using 2D array is consistent with exisiting libraries [1].
   
   [1] https://github.com/apache/spark/blob/1018f8b545a74f4fcaf123b5f15d52ac2afa2d3c/mllib/src/main/scala/org/apache/spark/ml/param/params.scala#L567



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org