You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/15 07:10:44 UTC

[GitHub] [flink-ml] weibozhao opened a new pull request, #131: [FLINK-28563] Add Transformer for VectorSlicer

weibozhao opened a new pull request, #131:
URL: https://github.com/apache/flink-ml/pull/131

   Add Transformer for VectorSlicer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r925083352


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/ParamValidators.java:
##########
@@ -38,6 +38,22 @@ public static <T> ParamValidator<T> gtEq(double lowerBound) {
         return (value) -> value != null && ((Number) value).doubleValue() >= lowerBound;
     }
 
+    // Checks if all values in the parameter intArray is greater than or equal to lowerBound.
+    public static <T> ParamValidator<T> intArrayGtEq(double lowerBound) {
+        return value -> {

Review Comment:
   Given that we are making this a public method, I think it might be better to make it more generic to be useful in other algorithms as well. For example, instead of just validating integer arrays, we can make it validate any numeric arrays. Or we can introduce a `forEach` method, so that in order to validate indices, we can use `ParamValidators.forEach(ParamValidators.gtEq(0))`. What do you think of these ideas? we can discuss offline about this topic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 merged pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
zhipeng93 merged PR #131:
URL: https://github.com/apache/flink-ml/pull/131


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] weibozhao commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
weibozhao commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r932966469


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link VectorSlicer}. */
+public class VectorSlicerTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1, 2.3, 3.4, 5.3, 5.1),
+                            Vectors.sparse(5, new int[] {1, 3, 4}, new double[] {0.1, 0.2, 0.3})),
+                    Row.of(
+                            1,
+                            Vectors.dense(2.3, 4.1, 1.3, 2.4, 5.1, 4.1),
+                            Vectors.sparse(5, new int[] {1, 2, 4}, new double[] {0.1, 0.2, 0.3})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.1, 3.1, 2.3);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(2.3, 4.1, 1.3);
+
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(3, new int[] {1}, new double[] {0.1});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(3, new int[] {1, 2}, new double[] {0.1, 0.2});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_1, result.getField(outputCol));
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_4, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_2, result.getField(outputCol));
+                }
+            } else {
+                throw new RuntimeException("Result id value is error, it must be 0 or 1.");
+            }
+        }
+    }
+
+    @Test
+    public void testParam() {
+        VectorSlicer vectorSlicer = new VectorSlicer();
+        assertEquals("input", vectorSlicer.getInputCol());
+        assertEquals("output", vectorSlicer.getOutputCol());
+        vectorSlicer.setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        assertEquals("vec", vectorSlicer.getInputCol());
+        assertEquals("sliceVec", vectorSlicer.getOutputCol());
+        assertArrayEquals(new Integer[] {0, 1, 2}, vectorSlicer.getIndices());
+    }
+
+    @Test
+    public void testSaveLoadAndTransform() throws Exception {
+        VectorSlicer vectorSlicer =
+                new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        VectorSlicer loadedVectorSlicer =
+                TestUtils.saveAndReload(
+                        tEnv, vectorSlicer, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+        Table output = loadedVectorSlicer.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedVectorSlicer.getOutputCol(), false);
+    }
+
+    @Test
+    public void testEmptyIndices() {

Review Comment:
   Empty indices means extracting nothing and return a vector with no values, which has no meaning. 
   I don't think all things that spark done make sence.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r933021441


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link VectorSlicer}. */
+public class VectorSlicerTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1, 2.3, 3.4, 5.3, 5.1),
+                            Vectors.sparse(5, new int[] {1, 3, 4}, new double[] {0.1, 0.2, 0.3})),
+                    Row.of(
+                            1,
+                            Vectors.dense(2.3, 4.1, 1.3, 2.4, 5.1, 4.1),
+                            Vectors.sparse(5, new int[] {1, 2, 4}, new double[] {0.1, 0.2, 0.3})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.1, 3.1, 2.3);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(2.3, 4.1, 1.3);
+
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(3, new int[] {1}, new double[] {0.1});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(3, new int[] {1, 2}, new double[] {0.1, 0.2});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_1, result.getField(outputCol));
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_4, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_2, result.getField(outputCol));
+                }
+            } else {
+                throw new RuntimeException("Result id value is error, it must be 0 or 1.");
+            }
+        }
+    }
+
+    @Test
+    public void testParam() {
+        VectorSlicer vectorSlicer = new VectorSlicer();
+        assertEquals("input", vectorSlicer.getInputCol());
+        assertEquals("output", vectorSlicer.getOutputCol());
+        vectorSlicer.setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        assertEquals("vec", vectorSlicer.getInputCol());
+        assertEquals("sliceVec", vectorSlicer.getOutputCol());
+        assertArrayEquals(new Integer[] {0, 1, 2}, vectorSlicer.getIndices());
+    }
+
+    @Test
+    public void testSaveLoadAndTransform() throws Exception {
+        VectorSlicer vectorSlicer =
+                new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        VectorSlicer loadedVectorSlicer =
+                TestUtils.saveAndReload(
+                        tEnv, vectorSlicer, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+        Table output = loadedVectorSlicer.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedVectorSlicer.getOutputCol(), false);
+    }
+
+    @Test
+    public void testEmptyIndices() {

Review Comment:
   I agree with @weibozhao . Using a empty array as the indices seems not a valid use case --- If the indices is an empty array, the output should be a table with each data point as an array with zero elements. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r933040781


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link VectorSlicer}. */
+public class VectorSlicerTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1, 2.3, 3.4, 5.3, 5.1),
+                            Vectors.sparse(5, new int[] {1, 3, 4}, new double[] {0.1, 0.2, 0.3})),
+                    Row.of(
+                            1,
+                            Vectors.dense(2.3, 4.1, 1.3, 2.4, 5.1, 4.1),
+                            Vectors.sparse(5, new int[] {1, 2, 4}, new double[] {0.1, 0.2, 0.3})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.1, 3.1, 2.3);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(2.3, 4.1, 1.3);
+
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(3, new int[] {1}, new double[] {0.1});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(3, new int[] {1, 2}, new double[] {0.1, 0.2});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_1, result.getField(outputCol));
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_4, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_2, result.getField(outputCol));
+                }
+            } else {
+                throw new RuntimeException("Result id value is error, it must be 0 or 1.");
+            }
+        }
+    }
+
+    @Test
+    public void testParam() {
+        VectorSlicer vectorSlicer = new VectorSlicer();
+        assertEquals("input", vectorSlicer.getInputCol());
+        assertEquals("output", vectorSlicer.getOutputCol());
+        vectorSlicer.setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        assertEquals("vec", vectorSlicer.getInputCol());
+        assertEquals("sliceVec", vectorSlicer.getOutputCol());
+        assertArrayEquals(new Integer[] {0, 1, 2}, vectorSlicer.getIndices());
+    }
+
+    @Test
+    public void testSaveLoadAndTransform() throws Exception {
+        VectorSlicer vectorSlicer =
+                new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        VectorSlicer loadedVectorSlicer =
+                TestUtils.saveAndReload(
+                        tEnv, vectorSlicer, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+        Table output = loadedVectorSlicer.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedVectorSlicer.getOutputCol(), false);
+    }
+
+    @Test
+    public void testEmptyIndices() {
+        try {
+            VectorSlicer vectorSlicer =
+                    new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices();
+            vectorSlicer.transform(inputDataTable);
+            Assert.fail("Expected IllegalArgumentException");
+        } catch (Exception e) {
+            assertEquals("Parameter indices is given an invalid value {}", e.getMessage());
+            assertEquals(IllegalArgumentException.class, ((Throwable) e).getClass());
+        }
+    }
+
+    @Test
+    public void testIndicesLargerThanVectorSize() {
+        try {
+            VectorSlicer vectorSlicer =
+                    new VectorSlicer()
+                            .setInputCol("vec")
+                            .setOutputCol("sliceVec")
+                            .setIndices(1, 2, 10);
+            Table output = vectorSlicer.transform(inputDataTable)[0];
+            DataStream<Row> dataStream = tEnv.toDataStream(output);
+            IteratorUtils.toList(dataStream.executeAndCollect());
+            Assert.fail("Expected RuntimeException");
+        } catch (Exception e) {
+            assertEquals(
+                    "Index value 10 is greater than vector size:6",
+                    ExceptionUtils.getRootCause(e).getMessage());
+            assertEquals(IllegalArgumentException.class, ExceptionUtils.getRootCause(e).getClass());
+        }
+    }
+
+    @Test
+    public void testIndicesSmallerThanZero() {
+        try {
+            new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(1, -2);
+            Assert.fail("Expected IllegalArgumentException");
+        } catch (Exception e) {
+            assertEquals("Parameter indices is given an invalid value {1,-2}", e.getMessage());
+            assertEquals(IllegalArgumentException.class, e.getClass());
+        }
+    }
+
+    @Test
+    public void testDuplicateIndices() {
+        try {
+            new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(1, 1, 3);
+            Assert.fail("Expected IllegalArgumentException");

Review Comment:
   How about we replace this line with `fail();`? We are assuming that the code never gets here. And it is not a IllegalArgumentException.
   
   Same for other `Assert.fails`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r925345067


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/ParamValidators.java:
##########
@@ -38,6 +38,22 @@ public static <T> ParamValidator<T> gtEq(double lowerBound) {
         return (value) -> value != null && ((Number) value).doubleValue() >= lowerBound;
     }
 
+    // Checks if all values in the parameter intArray is greater than or equal to lowerBound.
+    public static <T> ParamValidator<T> intArrayGtEq(double lowerBound) {
+        return value -> {

Review Comment:
   We agreed during an offline discussion that `ParamValidators.numericalArrayGtEq(0)` is a proper solution. For similar discussions we can directly refer to existing practices, like that in Alink or Spark.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r933021441


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link VectorSlicer}. */
+public class VectorSlicerTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1, 2.3, 3.4, 5.3, 5.1),
+                            Vectors.sparse(5, new int[] {1, 3, 4}, new double[] {0.1, 0.2, 0.3})),
+                    Row.of(
+                            1,
+                            Vectors.dense(2.3, 4.1, 1.3, 2.4, 5.1, 4.1),
+                            Vectors.sparse(5, new int[] {1, 2, 4}, new double[] {0.1, 0.2, 0.3})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.1, 3.1, 2.3);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(2.3, 4.1, 1.3);
+
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(3, new int[] {1}, new double[] {0.1});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(3, new int[] {1, 2}, new double[] {0.1, 0.2});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_1, result.getField(outputCol));
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_4, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_2, result.getField(outputCol));
+                }
+            } else {
+                throw new RuntimeException("Result id value is error, it must be 0 or 1.");
+            }
+        }
+    }
+
+    @Test
+    public void testParam() {
+        VectorSlicer vectorSlicer = new VectorSlicer();
+        assertEquals("input", vectorSlicer.getInputCol());
+        assertEquals("output", vectorSlicer.getOutputCol());
+        vectorSlicer.setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        assertEquals("vec", vectorSlicer.getInputCol());
+        assertEquals("sliceVec", vectorSlicer.getOutputCol());
+        assertArrayEquals(new Integer[] {0, 1, 2}, vectorSlicer.getIndices());
+    }
+
+    @Test
+    public void testSaveLoadAndTransform() throws Exception {
+        VectorSlicer vectorSlicer =
+                new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        VectorSlicer loadedVectorSlicer =
+                TestUtils.saveAndReload(
+                        tEnv, vectorSlicer, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+        Table output = loadedVectorSlicer.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedVectorSlicer.getOutputCol(), false);
+    }
+
+    @Test
+    public void testEmptyIndices() {

Review Comment:
   I agree with @weibozhao . Using a empty array as the indices seems not a valid use case. 
   
   If the indices is an empty array, the output should be a table with each data point as an array with zero elements. We can simply acheive this via a map.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r931700123


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicerParams.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.ml.common.param.HasInputCol;
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.param.IntArrayParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/**
+ * Params of {@link VectorSlicer}.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface VectorSlicerParams<T> extends HasInputCol<T>, HasOutputCol<T> {
+    Param<Integer[]> INDICES =
+            new IntArrayParam(
+                    "indices",
+                    "An array of indices to select features from a vector column.",
+                    null,
+                    ParamValidators.numericalArrayGtEq(0));

Review Comment:
   I just found that in order to align the behavior of this algorithm with existing practices, we need to make sure that the input indices are not duplicated. Could you please improve the param validator with this function, add JavaDoc to explain the behavior when there are duplicated input indices, and add corresponding tests?
   
   Given that we need to verify the input integers are non-negative and non-duplicate, do you think it would be better to add a unique private param validator in this class to verify this specified case, instead of trying to add a common validator?



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicer.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * VectorSlicer is a transformer that transforms a vector to a new one with a sub-array of the
+ * original features. It is useful for extracting features from a given vector. If the indices
+ * acquired from setIndices() are not in order, the indices of the result vector will be sorted. If

Review Comment:
   The existing practice in Spark seems not to re-order the given indices before slicing, which means if the provided indices are not in order, it will arrange the indices in the order given. Could you please modify the corresponding implementation, unit test and JavaDocs?



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link VectorSlicer}. */
+public class VectorSlicerTest extends AbstractTestBase {

Review Comment:
   Could you please add unit tests for the following corner cases?
   - The `indices` is an empty array.
   - The length of the vectors is not equal, but the max value in the `indices` is smaller than the minimum length.
   - The length of the vectors is not equal, and the max value in the `indices` is larger than the minimum length.



##########
flink-ml-python/pyflink/ml/lib/feature/vectorslicer.py:
##########
@@ -0,0 +1,71 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Tuple
+from pyflink.ml.core.wrapper import JavaWithParams
+from pyflink.ml.core.param import IntArrayParam, ParamValidators
+from pyflink.ml.lib.feature.common import JavaFeatureTransformer
+from pyflink.ml.lib.param import HasInputCol, HasOutputCol, Param
+
+
+class _VectorSlicerParams(
+    JavaWithParams,
+    HasInputCol,
+    HasOutputCol
+):
+    """
+    Params for :class:`VectorSlicer`.
+    """
+
+    INDICES: Param[Tuple[int, ...]] = IntArrayParam(
+        "indices",
+        "An array of indices to select features from a vector column.",
+        None,
+        ParamValidators.numerical_array_gt_eq(0))
+
+    def __init__(self, java_params):
+        super(_VectorSlicerParams, self).__init__(java_params)
+
+    def set_indices(self, *ind: int):
+        return self.set(self.INDICES, ind)
+
+    def get_indices(self) -> Tuple[int, ...]:
+        return self.get(self.INDICES)
+
+    @property
+    def indices(self) -> Tuple[int, ...]:
+        return self.get_indices()
+
+
+class VectorSlicer(JavaFeatureTransformer, _VectorSlicerParams):
+    """
+    VectorSlicer is a transformer that transforms a vector to a new one with a sub-array of the
+    original features. It is useful for extracting features from a given vector. If the indices
+    acquired from setIndices() are not in order, the indices of the result vector will be sorted.

Review Comment:
   The comment here is inconsistent with that in JavaDoc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r933021441


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link VectorSlicer}. */
+public class VectorSlicerTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1, 2.3, 3.4, 5.3, 5.1),
+                            Vectors.sparse(5, new int[] {1, 3, 4}, new double[] {0.1, 0.2, 0.3})),
+                    Row.of(
+                            1,
+                            Vectors.dense(2.3, 4.1, 1.3, 2.4, 5.1, 4.1),
+                            Vectors.sparse(5, new int[] {1, 2, 4}, new double[] {0.1, 0.2, 0.3})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.1, 3.1, 2.3);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(2.3, 4.1, 1.3);
+
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(3, new int[] {1}, new double[] {0.1});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(3, new int[] {1, 2}, new double[] {0.1, 0.2});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_1, result.getField(outputCol));
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_4, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_2, result.getField(outputCol));
+                }
+            } else {
+                throw new RuntimeException("Result id value is error, it must be 0 or 1.");
+            }
+        }
+    }
+
+    @Test
+    public void testParam() {
+        VectorSlicer vectorSlicer = new VectorSlicer();
+        assertEquals("input", vectorSlicer.getInputCol());
+        assertEquals("output", vectorSlicer.getOutputCol());
+        vectorSlicer.setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        assertEquals("vec", vectorSlicer.getInputCol());
+        assertEquals("sliceVec", vectorSlicer.getOutputCol());
+        assertArrayEquals(new Integer[] {0, 1, 2}, vectorSlicer.getIndices());
+    }
+
+    @Test
+    public void testSaveLoadAndTransform() throws Exception {
+        VectorSlicer vectorSlicer =
+                new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        VectorSlicer loadedVectorSlicer =
+                TestUtils.saveAndReload(
+                        tEnv, vectorSlicer, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+        Table output = loadedVectorSlicer.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedVectorSlicer.getOutputCol(), false);
+    }
+
+    @Test
+    public void testEmptyIndices() {

Review Comment:
   I agree with @weibozhao . Using a empty array as the indices seems not a valid use case. 
   
   If the indices is an empty array, the output should be a table with each data point as an array with zero elements. Even if the users need it, we can simply solve via a map.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link VectorSlicer}. */
+public class VectorSlicerTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1, 2.3, 3.4, 5.3, 5.1),
+                            Vectors.sparse(5, new int[] {1, 3, 4}, new double[] {0.1, 0.2, 0.3})),
+                    Row.of(
+                            1,
+                            Vectors.dense(2.3, 4.1, 1.3, 2.4, 5.1, 4.1),
+                            Vectors.sparse(5, new int[] {1, 2, 4}, new double[] {0.1, 0.2, 0.3})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.1, 3.1, 2.3);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(2.3, 4.1, 1.3);
+
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(3, new int[] {1}, new double[] {0.1});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(3, new int[] {1, 2}, new double[] {0.1, 0.2});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_1, result.getField(outputCol));
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_4, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_2, result.getField(outputCol));
+                }
+            } else {
+                throw new RuntimeException("Result id value is error, it must be 0 or 1.");
+            }
+        }
+    }
+
+    @Test
+    public void testParam() {
+        VectorSlicer vectorSlicer = new VectorSlicer();
+        assertEquals("input", vectorSlicer.getInputCol());
+        assertEquals("output", vectorSlicer.getOutputCol());
+        vectorSlicer.setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        assertEquals("vec", vectorSlicer.getInputCol());
+        assertEquals("sliceVec", vectorSlicer.getOutputCol());
+        assertArrayEquals(new Integer[] {0, 1, 2}, vectorSlicer.getIndices());
+    }
+
+    @Test
+    public void testSaveLoadAndTransform() throws Exception {
+        VectorSlicer vectorSlicer =
+                new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        VectorSlicer loadedVectorSlicer =
+                TestUtils.saveAndReload(
+                        tEnv, vectorSlicer, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+        Table output = loadedVectorSlicer.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedVectorSlicer.getOutputCol(), false);
+    }
+
+    @Test
+    public void testEmptyIndices() {

Review Comment:
   I agree with @weibozhao . Using a empty array as the indices seems not a valid use case. 
   
   If the indices is an empty array, the output should be a table with each data point as an array with zero elements. Even if the users really need it, we can simply solve via a map.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r933035303


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicer.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**

Review Comment:
   How about we update the java doc as follows:
   
   ```
   A Transformer that transforms a vector to a new feature, which is a sub-array of the original feature. It is useful for extracting features from a given vector. 
   
   <p>Note that  duplicate features are not allowed, so there can be no overlap between selected indices. If the max value of the indices is greater than the size of the input vector, it throws an IllegalArgumentException.
   
   ```



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link VectorSlicer}. */
+public class VectorSlicerTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1, 2.3, 3.4, 5.3, 5.1),
+                            Vectors.sparse(5, new int[] {1, 3, 4}, new double[] {0.1, 0.2, 0.3})),
+                    Row.of(
+                            1,
+                            Vectors.dense(2.3, 4.1, 1.3, 2.4, 5.1, 4.1),
+                            Vectors.sparse(5, new int[] {1, 2, 4}, new double[] {0.1, 0.2, 0.3})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.1, 3.1, 2.3);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(2.3, 4.1, 1.3);
+
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(3, new int[] {1}, new double[] {0.1});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(3, new int[] {1, 2}, new double[] {0.1, 0.2});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_1, result.getField(outputCol));
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_4, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_2, result.getField(outputCol));
+                }
+            } else {
+                throw new RuntimeException("Result id value is error, it must be 0 or 1.");
+            }
+        }
+    }
+
+    @Test
+    public void testParam() {
+        VectorSlicer vectorSlicer = new VectorSlicer();
+        assertEquals("input", vectorSlicer.getInputCol());
+        assertEquals("output", vectorSlicer.getOutputCol());
+        vectorSlicer.setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        assertEquals("vec", vectorSlicer.getInputCol());
+        assertEquals("sliceVec", vectorSlicer.getOutputCol());
+        assertArrayEquals(new Integer[] {0, 1, 2}, vectorSlicer.getIndices());
+    }
+
+    @Test
+    public void testSaveLoadAndTransform() throws Exception {
+        VectorSlicer vectorSlicer =
+                new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        VectorSlicer loadedVectorSlicer =
+                TestUtils.saveAndReload(
+                        tEnv, vectorSlicer, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+        Table output = loadedVectorSlicer.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedVectorSlicer.getOutputCol(), false);
+    }
+
+    @Test
+    public void testEmptyIndices() {
+        try {
+            VectorSlicer vectorSlicer =
+                    new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices();
+            vectorSlicer.transform(inputDataTable);
+            Assert.fail("Expected IllegalArgumentException");
+        } catch (Exception e) {
+            assertEquals("Parameter indices is given an invalid value {}", e.getMessage());
+            assertEquals(IllegalArgumentException.class, ((Throwable) e).getClass());
+        }
+    }
+
+    @Test
+    public void testIndicesLargerThanVectorSize() {
+        try {
+            VectorSlicer vectorSlicer =
+                    new VectorSlicer()
+                            .setInputCol("vec")
+                            .setOutputCol("sliceVec")
+                            .setIndices(1, 2, 10);
+            Table output = vectorSlicer.transform(inputDataTable)[0];
+            DataStream<Row> dataStream = tEnv.toDataStream(output);
+            IteratorUtils.toList(dataStream.executeAndCollect());
+            Assert.fail("Expected RuntimeException");
+        } catch (Exception e) {
+            assertEquals(
+                    "Index value 10 is greater than vector size:6",
+                    ExceptionUtils.getRootCause(e).getMessage());
+            assertEquals(IllegalArgumentException.class, ExceptionUtils.getRootCause(e).getClass());
+        }
+    }
+
+    @Test
+    public void testIndicesSmallerThanZero() {
+        try {
+            new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(1, -2);
+            Assert.fail("Expected IllegalArgumentException");
+        } catch (Exception e) {
+            assertEquals("Parameter indices is given an invalid value {1,-2}", e.getMessage());
+            assertEquals(IllegalArgumentException.class, e.getClass());
+        }
+    }
+
+    @Test
+    public void testDuplicateIndices() {
+        try {
+            new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(1, 1, 3);
+            Assert.fail("Expected IllegalArgumentException");

Review Comment:
   what about replace this line with `fail();`? We are assuming that the code never gets here. And it is not a IllegalArgumentException.
   
   Same for other `Assert.fails`.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link VectorSlicer}. */
+public class VectorSlicerTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1, 2.3, 3.4, 5.3, 5.1),
+                            Vectors.sparse(5, new int[] {1, 3, 4}, new double[] {0.1, 0.2, 0.3})),
+                    Row.of(
+                            1,
+                            Vectors.dense(2.3, 4.1, 1.3, 2.4, 5.1, 4.1),
+                            Vectors.sparse(5, new int[] {1, 2, 4}, new double[] {0.1, 0.2, 0.3})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.1, 3.1, 2.3);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(2.3, 4.1, 1.3);
+
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(3, new int[] {1}, new double[] {0.1});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(3, new int[] {1, 2}, new double[] {0.1, 0.2});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_1, result.getField(outputCol));
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_4, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_2, result.getField(outputCol));
+                }
+            } else {
+                throw new RuntimeException("Result id value is error, it must be 0 or 1.");
+            }
+        }
+    }
+
+    @Test
+    public void testParam() {
+        VectorSlicer vectorSlicer = new VectorSlicer();
+        assertEquals("input", vectorSlicer.getInputCol());
+        assertEquals("output", vectorSlicer.getOutputCol());
+        vectorSlicer.setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        assertEquals("vec", vectorSlicer.getInputCol());
+        assertEquals("sliceVec", vectorSlicer.getOutputCol());
+        assertArrayEquals(new Integer[] {0, 1, 2}, vectorSlicer.getIndices());
+    }
+
+    @Test
+    public void testSaveLoadAndTransform() throws Exception {
+        VectorSlicer vectorSlicer =
+                new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        VectorSlicer loadedVectorSlicer =
+                TestUtils.saveAndReload(
+                        tEnv, vectorSlicer, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+        Table output = loadedVectorSlicer.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedVectorSlicer.getOutputCol(), false);
+    }
+
+    @Test
+    public void testEmptyIndices() {
+        try {
+            VectorSlicer vectorSlicer =
+                    new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices();
+            vectorSlicer.transform(inputDataTable);
+            Assert.fail("Expected IllegalArgumentException");
+        } catch (Exception e) {
+            assertEquals("Parameter indices is given an invalid value {}", e.getMessage());
+            assertEquals(IllegalArgumentException.class, ((Throwable) e).getClass());
+        }
+    }
+
+    @Test
+    public void testIndicesLargerThanVectorSize() {
+        try {
+            VectorSlicer vectorSlicer =
+                    new VectorSlicer()
+                            .setInputCol("vec")
+                            .setOutputCol("sliceVec")
+                            .setIndices(1, 2, 10);
+            Table output = vectorSlicer.transform(inputDataTable)[0];
+            DataStream<Row> dataStream = tEnv.toDataStream(output);
+            IteratorUtils.toList(dataStream.executeAndCollect());
+            Assert.fail("Expected RuntimeException");
+        } catch (Exception e) {
+            assertEquals(
+                    "Index value 10 is greater than vector size:6",
+                    ExceptionUtils.getRootCause(e).getMessage());
+            assertEquals(IllegalArgumentException.class, ExceptionUtils.getRootCause(e).getClass());
+        }
+    }
+
+    @Test
+    public void testIndicesSmallerThanZero() {
+        try {
+            new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(1, -2);
+            Assert.fail("Expected IllegalArgumentException");
+        } catch (Exception e) {
+            assertEquals("Parameter indices is given an invalid value {1,-2}", e.getMessage());
+            assertEquals(IllegalArgumentException.class, e.getClass());
+        }
+    }
+
+    @Test
+    public void testDuplicateIndices() {
+        try {
+            new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(1, 1, 3);
+            Assert.fail("Expected IllegalArgumentException");
+        } catch (Exception e) {
+            assertEquals("Parameter indices is given an invalid value {1,1,3}", e.getMessage());
+            assertEquals(IllegalArgumentException.class, ((Throwable) e).getClass());

Review Comment:
   Also the check for `exception class type` seems unnecessary since we already checked the error message.
   
   Same for other checks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] weibozhao commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
weibozhao commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r925115735


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/ParamValidators.java:
##########
@@ -38,6 +38,22 @@ public static <T> ParamValidator<T> gtEq(double lowerBound) {
         return (value) -> value != null && ((Number) value).doubleValue() >= lowerBound;
     }
 
+    // Checks if all values in the parameter intArray is greater than or equal to lowerBound.
+    public static <T> ParamValidator<T> intArrayGtEq(double lowerBound) {
+        return value -> {

Review Comment:
   I think making it more generic is better than introducing a forEach function. Introducing a forEach may make the user API more complicated. 
   
   - ParamValidators.numericalArrayGtEq(0))
   - ParamValidators.forEach(ParamValidators.gtEq(0))



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] weibozhao commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
weibozhao commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r932842847


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicerParams.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.ml.common.param.HasInputCol;
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.param.IntArrayParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/**
+ * Params of {@link VectorSlicer}.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface VectorSlicerParams<T> extends HasInputCol<T>, HasOutputCol<T> {
+    Param<Integer[]> INDICES =
+            new IntArrayParam(
+                    "indices",
+                    "An array of indices to select features from a vector column.",
+                    null,
+                    ParamValidators.numericalArrayGtEq(0));

Review Comment:
   OK 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r929449313


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicer.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A transformer that transforms a vector to a new one with a sub-array of the original features. It
+ * is useful for extracting features from a given vector. If the indices acquired from setIndices()
+ * are not in order, the indices of the result vector will be sorted.
+ */
+public class VectorSlicer implements Transformer<VectorSlicer>, VectorSlicerParams<VectorSlicer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public VectorSlicer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), VectorTypeInfo.INSTANCE),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCol()));
+        DataStream<Row> output =
+                tEnv.toDataStream(inputs[0])
+                        .map(new VectorSlice(getIndices(), getInputCol()), outputTypeInfo);
+        Table outputTable = tEnv.fromDataStream(output);
+        return new Table[] {outputTable};
+    }
+
+    @Override
+    public void save(String path) throws IOException {
+        ReadWriteUtils.saveMetadata(this, path);
+    }
+
+    public static VectorSlicer load(StreamTableEnvironment env, String path) throws IOException {
+        return ReadWriteUtils.loadStageParam(path);
+    }
+
+    @Override
+    public Map<Param<?>, Object> getParamMap() {
+        return paramMap;
+    }
+
+    /** Vector slice function. */

Review Comment:
   Could you update the java doc here and let it contains more information? `Vector slice function` contains zero information here.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicer.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A transformer that transforms a vector to a new one with a sub-array of the original features. It
+ * is useful for extracting features from a given vector. If the indices acquired from setIndices()
+ * are not in order, the indices of the result vector will be sorted.
+ */
+public class VectorSlicer implements Transformer<VectorSlicer>, VectorSlicerParams<VectorSlicer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public VectorSlicer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), VectorTypeInfo.INSTANCE),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCol()));
+        DataStream<Row> output =
+                tEnv.toDataStream(inputs[0])
+                        .map(new VectorSlice(getIndices(), getInputCol()), outputTypeInfo);
+        Table outputTable = tEnv.fromDataStream(output);
+        return new Table[] {outputTable};
+    }
+
+    @Override
+    public void save(String path) throws IOException {
+        ReadWriteUtils.saveMetadata(this, path);
+    }
+
+    public static VectorSlicer load(StreamTableEnvironment env, String path) throws IOException {
+        return ReadWriteUtils.loadStageParam(path);
+    }
+
+    @Override
+    public Map<Param<?>, Object> getParamMap() {
+        return paramMap;
+    }
+
+    /** Vector slice function. */
+    private static class VectorSlice implements MapFunction<Row, Row> {
+        private final Integer[] indices;
+        private final String inputCol;
+
+        public VectorSlice(Integer[] indices, String inputCol) {
+            this.indices = indices;
+            Arrays.sort(this.indices);

Review Comment:
   nit: we could sort the indices outside this function, i.e., before constructing the job graph before Line#68.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicer.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A transformer that transforms a vector to a new one with a sub-array of the original features. It

Review Comment:
   nit: A Transformer...



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicer.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A transformer that transforms a vector to a new one with a sub-array of the original features. It
+ * is useful for extracting features from a given vector. If the indices acquired from setIndices()
+ * are not in order, the indices of the result vector will be sorted.
+ */
+public class VectorSlicer implements Transformer<VectorSlicer>, VectorSlicerParams<VectorSlicer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public VectorSlicer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), VectorTypeInfo.INSTANCE),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCol()));
+        DataStream<Row> output =
+                tEnv.toDataStream(inputs[0])
+                        .map(new VectorSlice(getIndices(), getInputCol()), outputTypeInfo);
+        Table outputTable = tEnv.fromDataStream(output);
+        return new Table[] {outputTable};
+    }
+
+    @Override
+    public void save(String path) throws IOException {
+        ReadWriteUtils.saveMetadata(this, path);
+    }
+
+    public static VectorSlicer load(StreamTableEnvironment env, String path) throws IOException {
+        return ReadWriteUtils.loadStageParam(path);
+    }
+
+    @Override
+    public Map<Param<?>, Object> getParamMap() {
+        return paramMap;
+    }
+
+    /** Vector slice function. */
+    private static class VectorSlice implements MapFunction<Row, Row> {
+        private final Integer[] indices;
+        private final String inputCol;
+
+        public VectorSlice(Integer[] indices, String inputCol) {
+            this.indices = indices;
+            Arrays.sort(this.indices);
+            this.inputCol = inputCol;
+        }
+
+        @Override
+        public Row map(Row row) throws Exception {
+            Vector inputVec = row.getFieldAs(inputCol);
+            Vector outputVec;
+            if (inputVec instanceof DenseVector) {
+                double[] values = new double[indices.length];
+                for (int i = 0; i < indices.length; ++i) {
+                    if (indices[i] >= inputVec.size()) {

Review Comment:
   The check at Line#107 and Line#119 could be moved to Line#102 since the indices is sorted for readbility and efficiency.
   
   Also how about we update the error message as `Index value is greater than vector size:  + inputVec.size`?



##########
flink-ml-python/pyflink/ml/lib/feature/vectorslicer.py:
##########
@@ -0,0 +1,70 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Tuple
+from pyflink.ml.core.wrapper import JavaWithParams
+from pyflink.ml.core.param import IntArrayParam
+from pyflink.ml.lib.feature.common import JavaFeatureTransformer
+from pyflink.ml.lib.param import HasInputCol, HasOutputCol, ParamValidators, Param
+
+
+class _VectorSlicerParams(
+    JavaWithParams,
+    HasInputCol,
+    HasOutputCol
+):
+    """
+    Params for :class:`VectorSlicer`.
+    """
+
+    INDICES: Param[Tuple[int, ...]] = IntArrayParam(
+        "indices",
+        "An array of indices to select features from a vector column.",
+        None,
+        ParamValidators.non_empty_array())
+
+    def __init__(self, java_params):
+        super(_VectorSlicerParams, self).__init__(java_params)
+
+    def set_indices(self, *ind: int):
+        return self.set(self.INDICES, ind)
+
+    def get_indices(self) -> Tuple[int, ...]:
+        return self.get(self.INDICES)
+
+    @property
+    def indices(self) -> Tuple[int, ...]:
+        return self.get_indices()
+
+
+class VectorSlicer(JavaFeatureTransformer, _VectorSlicerParams):
+    """
+    A feature transformer that transforms a vector to a new one with a sub-array of the original

Review Comment:
   nit: make the python doc consistent with java doc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] weibozhao commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
weibozhao commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r931902284


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicerParams.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.ml.common.param.HasInputCol;
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.param.IntArrayParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/**
+ * Params of {@link VectorSlicer}.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface VectorSlicerParams<T> extends HasInputCol<T>, HasOutputCol<T> {
+    Param<Integer[]> INDICES =
+            new IntArrayParam(
+                    "indices",
+                    "An array of indices to select features from a vector column.",
+                    null,
+                    ParamValidators.numericalArrayGtEq(0));

Review Comment:
   I think one validator can not do all the checks of this parameter. We just do some checks. The duplicated check may do in the algorithm code. The common validator only do one kind of checks. We can do the other checks in some other place, just like the begin of the algorithm. 
   
   What do you think about.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicerParams.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.ml.common.param.HasInputCol;
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.param.IntArrayParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/**
+ * Params of {@link VectorSlicer}.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface VectorSlicerParams<T> extends HasInputCol<T>, HasOutputCol<T> {
+    Param<Integer[]> INDICES =
+            new IntArrayParam(
+                    "indices",
+                    "An array of indices to select features from a vector column.",
+                    null,
+                    ParamValidators.numericalArrayGtEq(0));

Review Comment:
   I think one validator can not do all the checks of this parameter. We just do some checks. The duplicated check may do in the algorithm code. The common validator only do one kind of checks. We can do the other checks in some other place, just like the begin of the algorithm. 
   
   What do you think about?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] weibozhao commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
weibozhao commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r931902284


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicerParams.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.ml.common.param.HasInputCol;
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.param.IntArrayParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/**
+ * Params of {@link VectorSlicer}.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface VectorSlicerParams<T> extends HasInputCol<T>, HasOutputCol<T> {
+    Param<Integer[]> INDICES =
+            new IntArrayParam(
+                    "indices",
+                    "An array of indices to select features from a vector column.",
+                    null,
+                    ParamValidators.numericalArrayGtEq(0));

Review Comment:
   I think one validator can not do all the checks of this parameter. We just do some checks. The duplicated check may be done in the algorithm code. The common validator only do one kind of checks. We can do the other checks in some other place, just like the begin of the algorithm. 
   
   What do you think about?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r932943261


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link VectorSlicer}. */
+public class VectorSlicerTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1, 2.3, 3.4, 5.3, 5.1),
+                            Vectors.sparse(5, new int[] {1, 3, 4}, new double[] {0.1, 0.2, 0.3})),
+                    Row.of(
+                            1,
+                            Vectors.dense(2.3, 4.1, 1.3, 2.4, 5.1, 4.1),
+                            Vectors.sparse(5, new int[] {1, 2, 4}, new double[] {0.1, 0.2, 0.3})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.1, 3.1, 2.3);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(2.3, 4.1, 1.3);
+
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(3, new int[] {1}, new double[] {0.1});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(3, new int[] {1, 2}, new double[] {0.1, 0.2});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_1, result.getField(outputCol));
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_4, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_2, result.getField(outputCol));
+                }
+            } else {
+                throw new RuntimeException("Result id value is error, it must be 0 or 1.");
+            }
+        }
+    }
+
+    @Test
+    public void testParam() {
+        VectorSlicer vectorSlicer = new VectorSlicer();
+        assertEquals("input", vectorSlicer.getInputCol());
+        assertEquals("output", vectorSlicer.getOutputCol());
+        vectorSlicer.setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        assertEquals("vec", vectorSlicer.getInputCol());
+        assertEquals("sliceVec", vectorSlicer.getOutputCol());
+        assertArrayEquals(new Integer[] {0, 1, 2}, vectorSlicer.getIndices());
+    }
+
+    @Test
+    public void testSaveLoadAndTransform() throws Exception {
+        VectorSlicer vectorSlicer =
+                new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        VectorSlicer loadedVectorSlicer =
+                TestUtils.saveAndReload(
+                        tEnv, vectorSlicer, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+        Table output = loadedVectorSlicer.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedVectorSlicer.getOutputCol(), false);
+    }
+
+    @Test
+    public void testEmptyIndices() {

Review Comment:
   It seems that spark `VectorSlicer` treats an empty array as a valid input for indices. Can we also follow this convention?



##########
flink-ml-python/pyflink/ml/core/param.py:
##########
@@ -228,6 +228,22 @@ def validate(self, value: Tuple[T]) -> bool:
 
         return NonEmptyArray()
 
+    @staticmethod
+    def numerical_array_gt_eq(lower_bound: int) -> ParamValidator[Tuple[int]]:

Review Comment:
   This method can be removed now.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link VectorSlicer}. */
+public class VectorSlicerTest extends AbstractTestBase {

Review Comment:
   Let's also add a test case to verify when input indices are unordered.



##########
flink-ml-python/pyflink/ml/lib/feature/vectorslicer.py:
##########
@@ -0,0 +1,71 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Tuple
+from pyflink.ml.core.wrapper import JavaWithParams
+from pyflink.ml.core.param import IntArrayParam, ParamValidators
+from pyflink.ml.lib.feature.common import JavaFeatureTransformer
+from pyflink.ml.lib.param import HasInputCol, HasOutputCol, Param
+
+
+class _VectorSlicerParams(
+    JavaWithParams,
+    HasInputCol,
+    HasOutputCol
+):
+    """
+    Params for :class:`VectorSlicer`.
+    """
+
+    INDICES: Param[Tuple[int, ...]] = IntArrayParam(
+        "indices",
+        "An array of indices to select features from a vector column.",
+        None,
+        ParamValidators.numerical_array_gt_eq(0))
+
+    def __init__(self, java_params):
+        super(_VectorSlicerParams, self).__init__(java_params)
+
+    def set_indices(self, *ind: int):
+        return self.set(self.INDICES, ind)
+
+    def get_indices(self) -> Tuple[int, ...]:
+        return self.get(self.INDICES)
+
+    @property
+    def indices(self) -> Tuple[int, ...]:
+        return self.get_indices()
+
+
+class VectorSlicer(JavaFeatureTransformer, _VectorSlicerParams):
+    """
+    VectorSlicer is a transformer that transforms a vector to a new one with a sub-array of the
+    original features. It is useful for extracting features from a given vector. If the max
+    indices are larger than the size of the input vector, it will throw an IllegalArgumentException.

Review Comment:
   nit: This document is inconsistent with that in Java.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link VectorSlicer}. */
+public class VectorSlicerTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1, 2.3, 3.4, 5.3, 5.1),
+                            Vectors.sparse(5, new int[] {1, 3, 4}, new double[] {0.1, 0.2, 0.3})),
+                    Row.of(
+                            1,
+                            Vectors.dense(2.3, 4.1, 1.3, 2.4, 5.1, 4.1),
+                            Vectors.sparse(5, new int[] {1, 2, 4}, new double[] {0.1, 0.2, 0.3})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.1, 3.1, 2.3);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(2.3, 4.1, 1.3);
+
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(3, new int[] {1}, new double[] {0.1});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(3, new int[] {1, 2}, new double[] {0.1, 0.2});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_1, result.getField(outputCol));
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_4, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_2, result.getField(outputCol));
+                }
+            } else {
+                throw new RuntimeException("Result id value is error, it must be 0 or 1.");
+            }
+        }
+    }
+
+    @Test
+    public void testParam() {
+        VectorSlicer vectorSlicer = new VectorSlicer();
+        assertEquals("input", vectorSlicer.getInputCol());
+        assertEquals("output", vectorSlicer.getOutputCol());
+        vectorSlicer.setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        assertEquals("vec", vectorSlicer.getInputCol());
+        assertEquals("sliceVec", vectorSlicer.getOutputCol());
+        assertArrayEquals(new Integer[] {0, 1, 2}, vectorSlicer.getIndices());
+    }
+
+    @Test
+    public void testSaveLoadAndTransform() throws Exception {
+        VectorSlicer vectorSlicer =
+                new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        VectorSlicer loadedVectorSlicer =
+                TestUtils.saveAndReload(
+                        tEnv, vectorSlicer, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+        Table output = loadedVectorSlicer.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedVectorSlicer.getOutputCol(), false);
+    }
+
+    @Test
+    public void testEmptyIndices() {
+        try {
+            VectorSlicer vectorSlicer =
+                    new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices();
+            vectorSlicer.transform(inputDataTable);
+            Assert.fail("Expected IllegalArgumentException");
+        } catch (Exception e) {
+            assertEquals("Parameter indices is given an invalid value {}", e.getMessage());
+            assertEquals(IllegalArgumentException.class, ((Throwable) e).getClass());
+        }
+    }
+
+    @Test
+    public void testIndicesLargerThanVectorSize() {
+        try {
+            VectorSlicer vectorSlicer =
+                    new VectorSlicer()
+                            .setInputCol("vec")
+                            .setOutputCol("sliceVec")
+                            .setIndices(1, 2, 10);
+            Table output = vectorSlicer.transform(inputDataTable)[0];
+            DataStream<Row> dataStream = tEnv.toDataStream(output);
+            IteratorUtils.toList(dataStream.executeAndCollect());
+            Assert.fail("Expected RuntimeException");
+        } catch (Exception e) {
+            assertEquals(
+                    "Index value 10 is greater than vector size:6",
+                    e.getCause().getCause().getCause().getCause().getCause().getMessage());
+            assertEquals(
+                    IllegalArgumentException.class,
+                    e.getCause().getCause().getCause().getCause().getCause().getClass());
+        }
+    }
+
+    @Test
+    public void testIndicesSallerThanZero() {
+        try {
+            VectorSlicer vectorSlicer =
+                    new VectorSlicer()
+                            .setInputCol("vec")
+                            .setOutputCol("sliceVec")
+                            .setIndices(1, -2);
+            vectorSlicer.transform(inputDataTable);
+            Assert.fail("Expected IllegalArgumentException");
+        } catch (Exception e) {
+            assertEquals("Parameter indices is given an invalid value {1,-2}", e.getMessage());
+            assertEquals(IllegalArgumentException.class, ((Throwable) e).getClass());
+        }
+    }
+
+    @Test
+    public void testDuplicateIndices() {
+        try {
+            VectorSlicer vectorSlicer =
+                    new VectorSlicer()
+                            .setInputCol("vec")
+                            .setOutputCol("sliceVec")
+                            .setIndices(1, 1, 3);
+            vectorSlicer.transform(inputDataTable);

Review Comment:
   We can remove this line because an exception would be thrown before `transform()` is invoked. Same for other test cases.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link VectorSlicer}. */
+public class VectorSlicerTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1, 2.3, 3.4, 5.3, 5.1),
+                            Vectors.sparse(5, new int[] {1, 3, 4}, new double[] {0.1, 0.2, 0.3})),
+                    Row.of(
+                            1,
+                            Vectors.dense(2.3, 4.1, 1.3, 2.4, 5.1, 4.1),
+                            Vectors.sparse(5, new int[] {1, 2, 4}, new double[] {0.1, 0.2, 0.3})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.1, 3.1, 2.3);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(2.3, 4.1, 1.3);
+
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(3, new int[] {1}, new double[] {0.1});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(3, new int[] {1, 2}, new double[] {0.1, 0.2});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_1, result.getField(outputCol));
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_4, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_2, result.getField(outputCol));
+                }
+            } else {
+                throw new RuntimeException("Result id value is error, it must be 0 or 1.");
+            }
+        }
+    }
+
+    @Test
+    public void testParam() {
+        VectorSlicer vectorSlicer = new VectorSlicer();
+        assertEquals("input", vectorSlicer.getInputCol());
+        assertEquals("output", vectorSlicer.getOutputCol());
+        vectorSlicer.setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        assertEquals("vec", vectorSlicer.getInputCol());
+        assertEquals("sliceVec", vectorSlicer.getOutputCol());
+        assertArrayEquals(new Integer[] {0, 1, 2}, vectorSlicer.getIndices());
+    }
+
+    @Test
+    public void testSaveLoadAndTransform() throws Exception {
+        VectorSlicer vectorSlicer =
+                new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        VectorSlicer loadedVectorSlicer =
+                TestUtils.saveAndReload(
+                        tEnv, vectorSlicer, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+        Table output = loadedVectorSlicer.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedVectorSlicer.getOutputCol(), false);
+    }
+
+    @Test
+    public void testEmptyIndices() {
+        try {
+            VectorSlicer vectorSlicer =
+                    new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices();
+            vectorSlicer.transform(inputDataTable);
+            Assert.fail("Expected IllegalArgumentException");
+        } catch (Exception e) {
+            assertEquals("Parameter indices is given an invalid value {}", e.getMessage());
+            assertEquals(IllegalArgumentException.class, ((Throwable) e).getClass());
+        }
+    }
+
+    @Test
+    public void testIndicesLargerThanVectorSize() {
+        try {
+            VectorSlicer vectorSlicer =
+                    new VectorSlicer()
+                            .setInputCol("vec")
+                            .setOutputCol("sliceVec")
+                            .setIndices(1, 2, 10);
+            Table output = vectorSlicer.transform(inputDataTable)[0];
+            DataStream<Row> dataStream = tEnv.toDataStream(output);
+            IteratorUtils.toList(dataStream.executeAndCollect());
+            Assert.fail("Expected RuntimeException");
+        } catch (Exception e) {
+            assertEquals(
+                    "Index value 10 is greater than vector size:6",
+                    e.getCause().getCause().getCause().getCause().getCause().getMessage());
+            assertEquals(
+                    IllegalArgumentException.class,
+                    e.getCause().getCause().getCause().getCause().getCause().getClass());

Review Comment:
   nit: `ExceptionUtils.getRootCause(e)`



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link VectorSlicer}. */
+public class VectorSlicerTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1, 2.3, 3.4, 5.3, 5.1),
+                            Vectors.sparse(5, new int[] {1, 3, 4}, new double[] {0.1, 0.2, 0.3})),
+                    Row.of(
+                            1,
+                            Vectors.dense(2.3, 4.1, 1.3, 2.4, 5.1, 4.1),
+                            Vectors.sparse(5, new int[] {1, 2, 4}, new double[] {0.1, 0.2, 0.3})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.1, 3.1, 2.3);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(2.3, 4.1, 1.3);
+
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(3, new int[] {1}, new double[] {0.1});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(3, new int[] {1, 2}, new double[] {0.1, 0.2});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_1, result.getField(outputCol));
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_4, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_2, result.getField(outputCol));
+                }
+            } else {
+                throw new RuntimeException("Result id value is error, it must be 0 or 1.");
+            }
+        }
+    }
+
+    @Test
+    public void testParam() {
+        VectorSlicer vectorSlicer = new VectorSlicer();
+        assertEquals("input", vectorSlicer.getInputCol());
+        assertEquals("output", vectorSlicer.getOutputCol());
+        vectorSlicer.setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        assertEquals("vec", vectorSlicer.getInputCol());
+        assertEquals("sliceVec", vectorSlicer.getOutputCol());
+        assertArrayEquals(new Integer[] {0, 1, 2}, vectorSlicer.getIndices());
+    }
+
+    @Test
+    public void testSaveLoadAndTransform() throws Exception {
+        VectorSlicer vectorSlicer =
+                new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        VectorSlicer loadedVectorSlicer =
+                TestUtils.saveAndReload(
+                        tEnv, vectorSlicer, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+        Table output = loadedVectorSlicer.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedVectorSlicer.getOutputCol(), false);
+    }
+
+    @Test
+    public void testEmptyIndices() {
+        try {
+            VectorSlicer vectorSlicer =
+                    new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices();
+            vectorSlicer.transform(inputDataTable);
+            Assert.fail("Expected IllegalArgumentException");
+        } catch (Exception e) {
+            assertEquals("Parameter indices is given an invalid value {}", e.getMessage());
+            assertEquals(IllegalArgumentException.class, ((Throwable) e).getClass());
+        }
+    }
+
+    @Test
+    public void testIndicesLargerThanVectorSize() {
+        try {
+            VectorSlicer vectorSlicer =
+                    new VectorSlicer()
+                            .setInputCol("vec")
+                            .setOutputCol("sliceVec")
+                            .setIndices(1, 2, 10);
+            Table output = vectorSlicer.transform(inputDataTable)[0];
+            DataStream<Row> dataStream = tEnv.toDataStream(output);
+            IteratorUtils.toList(dataStream.executeAndCollect());
+            Assert.fail("Expected RuntimeException");
+        } catch (Exception e) {
+            assertEquals(
+                    "Index value 10 is greater than vector size:6",
+                    e.getCause().getCause().getCause().getCause().getCause().getMessage());
+            assertEquals(
+                    IllegalArgumentException.class,
+                    e.getCause().getCause().getCause().getCause().getCause().getClass());
+        }
+    }
+
+    @Test
+    public void testIndicesSallerThanZero() {
+        try {
+            VectorSlicer vectorSlicer =
+                    new VectorSlicer()
+                            .setInputCol("vec")
+                            .setOutputCol("sliceVec")
+                            .setIndices(1, -2);
+            vectorSlicer.transform(inputDataTable);
+            Assert.fail("Expected IllegalArgumentException");
+        } catch (Exception e) {
+            assertEquals("Parameter indices is given an invalid value {1,-2}", e.getMessage());
+            assertEquals(IllegalArgumentException.class, ((Throwable) e).getClass());

Review Comment:
   nit: we may not need to cast `e` to `Throwable` before invoking `getClass()`. Same for other places.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link VectorSlicer}. */
+public class VectorSlicerTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1, 2.3, 3.4, 5.3, 5.1),
+                            Vectors.sparse(5, new int[] {1, 3, 4}, new double[] {0.1, 0.2, 0.3})),
+                    Row.of(
+                            1,
+                            Vectors.dense(2.3, 4.1, 1.3, 2.4, 5.1, 4.1),
+                            Vectors.sparse(5, new int[] {1, 2, 4}, new double[] {0.1, 0.2, 0.3})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.1, 3.1, 2.3);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(2.3, 4.1, 1.3);
+
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(3, new int[] {1}, new double[] {0.1});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(3, new int[] {1, 2}, new double[] {0.1, 0.2});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_1, result.getField(outputCol));
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_4, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_2, result.getField(outputCol));
+                }
+            } else {
+                throw new RuntimeException("Result id value is error, it must be 0 or 1.");
+            }
+        }
+    }
+
+    @Test
+    public void testParam() {
+        VectorSlicer vectorSlicer = new VectorSlicer();
+        assertEquals("input", vectorSlicer.getInputCol());
+        assertEquals("output", vectorSlicer.getOutputCol());
+        vectorSlicer.setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        assertEquals("vec", vectorSlicer.getInputCol());
+        assertEquals("sliceVec", vectorSlicer.getOutputCol());
+        assertArrayEquals(new Integer[] {0, 1, 2}, vectorSlicer.getIndices());
+    }
+
+    @Test
+    public void testSaveLoadAndTransform() throws Exception {
+        VectorSlicer vectorSlicer =
+                new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        VectorSlicer loadedVectorSlicer =
+                TestUtils.saveAndReload(
+                        tEnv, vectorSlicer, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+        Table output = loadedVectorSlicer.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedVectorSlicer.getOutputCol(), false);
+    }
+
+    @Test
+    public void testEmptyIndices() {
+        try {
+            VectorSlicer vectorSlicer =
+                    new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices();
+            vectorSlicer.transform(inputDataTable);
+            Assert.fail("Expected IllegalArgumentException");
+        } catch (Exception e) {
+            assertEquals("Parameter indices is given an invalid value {}", e.getMessage());
+            assertEquals(IllegalArgumentException.class, ((Throwable) e).getClass());
+        }
+    }
+
+    @Test
+    public void testIndicesLargerThanVectorSize() {
+        try {
+            VectorSlicer vectorSlicer =
+                    new VectorSlicer()
+                            .setInputCol("vec")
+                            .setOutputCol("sliceVec")
+                            .setIndices(1, 2, 10);
+            Table output = vectorSlicer.transform(inputDataTable)[0];
+            DataStream<Row> dataStream = tEnv.toDataStream(output);
+            IteratorUtils.toList(dataStream.executeAndCollect());
+            Assert.fail("Expected RuntimeException");
+        } catch (Exception e) {
+            assertEquals(
+                    "Index value 10 is greater than vector size:6",
+                    e.getCause().getCause().getCause().getCause().getCause().getMessage());
+            assertEquals(
+                    IllegalArgumentException.class,
+                    e.getCause().getCause().getCause().getCause().getCause().getClass());
+        }
+    }
+
+    @Test
+    public void testIndicesSallerThanZero() {

Review Comment:
   nit: smaller



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] weibozhao commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
weibozhao commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r932014135


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link VectorSlicer}. */
+public class VectorSlicerTest extends AbstractTestBase {

Review Comment:
   I have add these ut. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r922941135


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/** Tests VectorSlicer. */
+public class VectorSlicerTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1, 2.3, 3.4, 5.3, 5.1),
+                            Vectors.sparse(5, new int[] {1, 3, 4}, new double[] {0.1, 0.2, 0.3})),
+                    Row.of(
+                            1,
+                            Vectors.dense(2.3, 4.1, 1.3, 2.4, 5.1, 4.1),
+                            Vectors.sparse(5, new int[] {1, 2, 4}, new double[] {0.1, 0.2, 0.3})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.1, 3.1, 2.3);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(2.3, 4.1, 1.3);
+
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(3, new int[] {1}, new double[] {0.1});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(3, new int[] {1, 2}, new double[] {0.1, 0.2});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_1, result.getField(outputCol));
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_4, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_2, result.getField(outputCol));
+                }
+            } else {
+                assertNull(result.getField(outputCol));
+            }
+        }
+    }
+
+    @Test
+    public void testParam() {
+        VectorSlicer vectorSlicer = new VectorSlicer();
+        assertEquals("output", vectorSlicer.getOutputCol());

Review Comment:
   Let's also check the inputCol and indices here.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicer.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A feature transformer that transforms a vector to a new one with a sub-array of the original
+ * features. It is useful for extracting features from a given vector.
+ */
+public class VectorSlicer implements Transformer<VectorSlicer>, VectorSlicerParams<VectorSlicer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public VectorSlicer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), VectorTypeInfo.INSTANCE),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCol()));
+        DataStream<Row> output =
+                tEnv.toDataStream(inputs[0])
+                        .map(new VectorSlice(getIndices(), getInputCol()), outputTypeInfo);
+        Table outputTable = tEnv.fromDataStream(output);
+        return new Table[] {outputTable};
+    }
+
+    /** Vector slice function. */
+    public static class VectorSlice implements MapFunction<Row, Row> {
+        private final Integer[] indices;
+        private final String inputCol;
+
+        public VectorSlice(Integer[] indices, String inputCol) {
+            this.indices = indices;
+            this.inputCol = inputCol;
+        }
+
+        @Override
+        public Row map(Row row) throws Exception {
+            Vector inputVec = row.getFieldAs(inputCol);
+            Vector outputVec;
+            if (inputVec instanceof DenseVector) {
+                double[] values = new double[indices.length];
+                for (int i = 0; i < indices.length; ++i) {
+                    if (indices[i] >= inputVec.size()) {
+                        throw new RuntimeException("Index is larger than vector size.");
+                    }
+                    values[i] = ((DenseVector) inputVec).values[indices[i]];
+                }
+                outputVec = new DenseVector(values);
+            } else {
+                int nnz = 0;
+                int[] inputIndices = ((SparseVector) inputVec).indices;
+                double[] inputValues = ((SparseVector) inputVec).values;
+                int[] outputIndices = new int[indices.length];
+                double[] outputValues = new double[indices.length];
+                for (int i = 0; i < indices.length; i++) {
+                    if (indices[i] >= inputVec.size()) {
+                        throw new RuntimeException("Index is larger than vector size.");
+                    }
+                    int pos = Arrays.binarySearch(inputIndices, indices[i]);

Review Comment:
   The `SparseVector.get()` method has already implemented the binary search. We can directly use that method without implementing these on our own.



##########
flink-ml-python/pyflink/ml/lib/param.py:
##########
@@ -155,6 +155,27 @@ def input_cols(self) -> Tuple[str, ...]:
         return self.get_input_cols()
 
 
+class HasIndices(WithParams, ABC):

Review Comment:
   Maybe we should implement it inside `vectorslicer.py`, instead of treating it as a common parameter. You can refer to the `K` parameter in `kmeans.py`.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/** Tests VectorSlicer. */
+public class VectorSlicerTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1, 2.3, 3.4, 5.3, 5.1),
+                            Vectors.sparse(5, new int[] {1, 3, 4}, new double[] {0.1, 0.2, 0.3})),
+                    Row.of(
+                            1,
+                            Vectors.dense(2.3, 4.1, 1.3, 2.4, 5.1, 4.1),
+                            Vectors.sparse(5, new int[] {1, 2, 4}, new double[] {0.1, 0.2, 0.3})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.1, 3.1, 2.3);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(2.3, 4.1, 1.3);
+
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(3, new int[] {1}, new double[] {0.1});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(3, new int[] {1, 2}, new double[] {0.1, 0.2});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_1, result.getField(outputCol));
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_4, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_2, result.getField(outputCol));
+                }
+            } else {
+                assertNull(result.getField(outputCol));

Review Comment:
   It seems that this line would not be reached. It might be better to remove this line or replace it with throwing an exception.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicer.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A feature transformer that transforms a vector to a new one with a sub-array of the original
+ * features. It is useful for extracting features from a given vector.
+ */

Review Comment:
   Let's add comments that describe what will happen if the indices acquired from `setIndices()` are not in order, and add corresponding test cases.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicerParams.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.ml.common.param.HasInputCol;
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.param.IntArrayParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/**
+ * Params of VectorSlicer.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface VectorSlicerParams<T> extends HasInputCol<T>, HasOutputCol<T> {
+    Param<Integer[]> INDICES =
+            new IntArrayParam(
+                    "indices",
+                    "An array of indices to select features from a vector column.",
+                    null,
+                    ParamValidators.nonEmptyArray());

Review Comment:
   It might be better to add a custom `ParamValidator` to verify that the indices are all greater or equal to 0 and are non-duplicate, like `VectorSlicer.validIndices()` in spark.



##########
flink-ml-python/pyflink/ml/lib/feature/tests/test_vectorslicer.py:
##########
@@ -0,0 +1,74 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+
+from pyflink.common import Types
+
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.vectorslicer import VectorSlicer
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+
+
+class VectorSlicerTest(PyFlinkMLTestCase):
+    def setUp(self):
+        super(VectorSlicerTest, self).setUp()
+        self.input_data_table = self.t_env.from_data_stream(
+            self.env.from_collection([
+                (1, Vectors.dense(2.1, 3.1, 1.2, 2.1)),
+                (2, Vectors.dense(2.3, 2.1, 1.3, 1.2)),
+            ],
+                type_info=Types.ROW_NAMED(
+                    ['id', 'vec'],
+                    [Types.INT(), DenseVectorTypeInfo()])))
+
+        self.expected_output_data_1 = Vectors.dense(2.1, 3.1, 1.2)
+        self.expected_output_data_2 = Vectors.dense(2.3, 2.1, 1.3)
+
+    def test_param(self):
+        vector_slicer = VectorSlicer()
+
+        self.assertEqual('input', vector_slicer.input_col)
+        self.assertEqual('output', vector_slicer.output_col)
+
+        vector_slicer.set_input_col('vec') \
+            .set_output_col('slice_vec') \
+            .set_indices(0, 1, 2)
+
+        self.assertEqual('vec', vector_slicer.input_col)
+        self.assertEqual((0, 1, 2), vector_slicer.indices)
+        self.assertEqual('slice_vec', vector_slicer.output_col)
+
+    def test_save_load_transform(self):
+        vector_slicer = VectorSlicer() \
+            .set_input_col('vec') \
+            .set_output_col('slice_vec') \
+            .set_indices(0, 1, 2)
+
+        path = os.path.join(self.temp_dir, 'test_save_load_transform_vector_slicer')
+        vector_slicer.save(path)
+        vector_slicer = VectorSlicer.load(self.t_env, path)
+
+        output_table = vector_slicer.transform(self.input_data_table)[0]
+        actual_outputs = [(result[0], result[2]) for result in
+                          self.t_env.to_data_stream(output_table).execute_and_collect()]
+
+        for actual_output in actual_outputs:

Review Comment:
   It might be better to also check the length of the actual outputs.



##########
flink-ml-python/pyflink/ml/lib/feature/tests/test_vectorslicer.py:
##########
@@ -0,0 +1,74 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+
+from pyflink.common import Types
+
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.vectorslicer import VectorSlicer
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+
+
+class VectorSlicerTest(PyFlinkMLTestCase):
+    def setUp(self):
+        super(VectorSlicerTest, self).setUp()
+        self.input_data_table = self.t_env.from_data_stream(
+            self.env.from_collection([
+                (1, Vectors.dense(2.1, 3.1, 1.2, 2.1)),
+                (2, Vectors.dense(2.3, 2.1, 1.3, 1.2)),
+            ],
+                type_info=Types.ROW_NAMED(
+                    ['id', 'vec'],
+                    [Types.INT(), DenseVectorTypeInfo()])))
+
+        self.expected_output_data_1 = Vectors.dense(2.1, 3.1, 1.2)
+        self.expected_output_data_2 = Vectors.dense(2.3, 2.1, 1.3)
+
+    def test_param(self):
+        vector_slicer = VectorSlicer()
+
+        self.assertEqual('input', vector_slicer.input_col)
+        self.assertEqual('output', vector_slicer.output_col)

Review Comment:
   Let's use the methods instead of accessing members, which means instead of 
   ```python
   vector_slicer.input_col
   ```
   Let's use
   ```python
   vector_slicer.get_input_col()
   ```
   
   Besides, let's add check for indices here as well.



##########
flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/VectorSlicerExample.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.feature;
+
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that creates a VectorSlicer instance and uses it for feature engineering. */
+public class VectorSlicerExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<Row> inputStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(2.1, 3.1, 1.2, 3.1, 4.6)),
+                        Row.of(Vectors.dense(1.2, 3.1, 4.6, 2.1, 3.1)));
+        Table inputTable = tEnv.fromDataStream(inputStream).as("vec");
+
+        // Creates a VectorSlicer object and initializes its parameters.
+        VectorSlicer vectorSlicer =
+                new VectorSlicer().setInputCol("vec").setIndices(1, 2, 3).setOutputCol("slicedVec");
+
+        // Uses the VectorSlicer object for feature transformations.
+        Table outputTable = vectorSlicer.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+
+            Vector inputValue = (Vector) row.getField(vectorSlicer.getInputCol());
+
+            Vector outputValue = (Vector) row.getField(vectorSlicer.getOutputCol());
+
+            System.out.printf("Input Values: %s \tOutput Value: %s\n", inputValue, outputValue);

Review Comment:
   nit: "Values" -> "Value"



##########
flink-ml-python/pyflink/examples/ml/feature/vectorslicer_example.py:
##########
@@ -0,0 +1,64 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that creates a VectorSlicer instance and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to set up Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.vectorslicer import VectorSlicer
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data_table = t_env.from_data_stream(
+    env.from_collection([
+        (1, Vectors.dense(2.1, 3.1, 1.2, 2.1)),
+        (2, Vectors.dense(2.3, 2.1, 1.3, 1.2)),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['id', 'vec'],
+            [Types.INT(), DenseVectorTypeInfo()])))
+
+# create a vector slicer object and initialize its parameters
+vector_slicer = VectorSlicer() \
+    .set_input_col('vec') \
+    .set_indices(1, 2, 3) \
+    .set_output_col('sub_vec')
+
+# use the vector slicer model for feature engineering

Review Comment:
   I just realized that vector slicer is not a Model. Could you please help fix this comment with something like `vector slicer instance` or `vector slicer object`, and fix other places like that in `vectorassembler_example.py`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] weibozhao commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
weibozhao commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r925115735


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/ParamValidators.java:
##########
@@ -38,6 +38,22 @@ public static <T> ParamValidator<T> gtEq(double lowerBound) {
         return (value) -> value != null && ((Number) value).doubleValue() >= lowerBound;
     }
 
+    // Checks if all values in the parameter intArray is greater than or equal to lowerBound.
+    public static <T> ParamValidator<T> intArrayGtEq(double lowerBound) {
+        return value -> {

Review Comment:
   I think making it more generic is better than introducing a forEach function. Introducing a forEach may make the user API more complicated. 
   
   - ParamValidators.numericalArrayGtEq(0))
   - ParamValidators.forEach(ParamValidators.gtEq(0))
   
   What do you think about it? @yunfengzhou-hub 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r929461602


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/ParamValidators.java:
##########
@@ -38,6 +38,22 @@ public static <T> ParamValidator<T> gtEq(double lowerBound) {
         return (value) -> value != null && ((Number) value).doubleValue() >= lowerBound;
     }
 
+    // Checks if all values in the parameter numericalArray is greater than or equal to lowerBound.

Review Comment:
   nit: `numericalArray` is not a Proper Noun in this context, as we do not have any class with this name. Consider expressing it in other ways, like "array-typed parameter" or "numerical array parameter".



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicer.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A transformer that transforms a vector to a new one with a sub-array of the original features. It
+ * is useful for extracting features from a given vector. If the indices acquired from setIndices()
+ * are not in order, the indices of the result vector will be sorted.
+ */
+public class VectorSlicer implements Transformer<VectorSlicer>, VectorSlicerParams<VectorSlicer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public VectorSlicer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), VectorTypeInfo.INSTANCE),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCol()));
+        DataStream<Row> output =
+                tEnv.toDataStream(inputs[0])
+                        .map(new VectorSlice(getIndices(), getInputCol()), outputTypeInfo);
+        Table outputTable = tEnv.fromDataStream(output);
+        return new Table[] {outputTable};
+    }
+
+    @Override
+    public void save(String path) throws IOException {
+        ReadWriteUtils.saveMetadata(this, path);
+    }
+
+    public static VectorSlicer load(StreamTableEnvironment env, String path) throws IOException {
+        return ReadWriteUtils.loadStageParam(path);
+    }
+
+    @Override
+    public Map<Param<?>, Object> getParamMap() {
+        return paramMap;
+    }
+
+    /** Vector slice function. */
+    private static class VectorSlice implements MapFunction<Row, Row> {

Review Comment:
   nit: Let's rename this class to `VectorSliceFunction`, as class names usually end with nouns.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests VectorSlicer. */

Review Comment:
   nit: {@link VectorSlicer}`



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicerParams.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.ml.common.param.HasInputCol;
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.param.IntArrayParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/**
+ * Params of VectorSlicer.

Review Comment:
   nit: `{@link VectorSlicer}`.



##########
flink-ml-python/pyflink/ml/lib/feature/vectorslicer.py:
##########
@@ -0,0 +1,70 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from typing import Tuple
+from pyflink.ml.core.wrapper import JavaWithParams
+from pyflink.ml.core.param import IntArrayParam
+from pyflink.ml.lib.feature.common import JavaFeatureTransformer
+from pyflink.ml.lib.param import HasInputCol, HasOutputCol, ParamValidators, Param
+
+
+class _VectorSlicerParams(
+    JavaWithParams,
+    HasInputCol,
+    HasOutputCol
+):
+    """
+    Params for :class:`VectorSlicer`.
+    """
+
+    INDICES: Param[Tuple[int, ...]] = IntArrayParam(
+        "indices",
+        "An array of indices to select features from a vector column.",
+        None,
+        ParamValidators.non_empty_array())

Review Comment:
   Let's use the same validator for Java and Python implementation.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests VectorSlicer. */
+public class VectorSlicerTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1, 2.3, 3.4, 5.3, 5.1),
+                            Vectors.sparse(5, new int[] {1, 3, 4}, new double[] {0.1, 0.2, 0.3})),
+                    Row.of(
+                            1,
+                            Vectors.dense(2.3, 4.1, 1.3, 2.4, 5.1, 4.1),
+                            Vectors.sparse(5, new int[] {1, 2, 4}, new double[] {0.1, 0.2, 0.3})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.1, 3.1, 2.3);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(2.3, 4.1, 1.3);
+
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(3, new int[] {1}, new double[] {0.1});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(3, new int[] {1, 2}, new double[] {0.1, 0.2});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_1, result.getField(outputCol));
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_4, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_2, result.getField(outputCol));
+                }
+            }

Review Comment:
   nit: Let's add an `else` here to handle unexpected situations.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicer.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A transformer that transforms a vector to a new one with a sub-array of the original features. It
+ * is useful for extracting features from a given vector. If the indices acquired from setIndices()
+ * are not in order, the indices of the result vector will be sorted.
+ */

Review Comment:
   What will happen if the max indices are larger than the size of the input vector? It might be better to have a clear definition of the behavior in this situation and add descriptions in JavaDoc and test cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] weibozhao commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
weibozhao commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r925115735


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/ParamValidators.java:
##########
@@ -38,6 +38,22 @@ public static <T> ParamValidator<T> gtEq(double lowerBound) {
         return (value) -> value != null && ((Number) value).doubleValue() >= lowerBound;
     }
 
+    // Checks if all values in the parameter intArray is greater than or equal to lowerBound.
+    public static <T> ParamValidator<T> intArrayGtEq(double lowerBound) {
+        return value -> {

Review Comment:
   I think making it more generic is better than introducing a forEach function. Introducing a forEach may make the user API more complicated. 
   
   - ParamValidators.numericalArrayGtEq(0))
   - ParamValidators.forEach(ParamValidators.gtEq(0))
   
   What do you think about it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r932840786


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicerParams.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.ml.common.param.HasInputCol;
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.param.IntArrayParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/**
+ * Params of {@link VectorSlicer}.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface VectorSlicerParams<T> extends HasInputCol<T>, HasOutputCol<T> {
+    Param<Integer[]> INDICES =
+            new IntArrayParam(
+                    "indices",
+                    "An array of indices to select features from a vector column.",
+                    null,
+                    ParamValidators.numericalArrayGtEq(0));

Review Comment:
   +1 to add a validator for VectorSlicer. If we agree to do this, we probably should also remove the numericalArrayGtEq since it is not needed any more then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r933042845


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link VectorSlicer}. */
+public class VectorSlicerTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1, 2.3, 3.4, 5.3, 5.1),
+                            Vectors.sparse(5, new int[] {1, 3, 4}, new double[] {0.1, 0.2, 0.3})),
+                    Row.of(
+                            1,
+                            Vectors.dense(2.3, 4.1, 1.3, 2.4, 5.1, 4.1),
+                            Vectors.sparse(5, new int[] {1, 2, 4}, new double[] {0.1, 0.2, 0.3})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.1, 3.1, 2.3);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(2.3, 4.1, 1.3);
+
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(3, new int[] {1}, new double[] {0.1});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(3, new int[] {1, 2}, new double[] {0.1, 0.2});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_1, result.getField(outputCol));
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_4, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_2, result.getField(outputCol));
+                }
+            } else {
+                throw new RuntimeException("Result id value is error, it must be 0 or 1.");
+            }
+        }
+    }
+
+    @Test
+    public void testParam() {
+        VectorSlicer vectorSlicer = new VectorSlicer();
+        assertEquals("input", vectorSlicer.getInputCol());
+        assertEquals("output", vectorSlicer.getOutputCol());
+        vectorSlicer.setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        assertEquals("vec", vectorSlicer.getInputCol());
+        assertEquals("sliceVec", vectorSlicer.getOutputCol());
+        assertArrayEquals(new Integer[] {0, 1, 2}, vectorSlicer.getIndices());
+    }
+
+    @Test
+    public void testSaveLoadAndTransform() throws Exception {
+        VectorSlicer vectorSlicer =
+                new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        VectorSlicer loadedVectorSlicer =
+                TestUtils.saveAndReload(
+                        tEnv, vectorSlicer, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+        Table output = loadedVectorSlicer.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedVectorSlicer.getOutputCol(), false);
+    }
+
+    @Test
+    public void testEmptyIndices() {
+        try {
+            VectorSlicer vectorSlicer =
+                    new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices();
+            vectorSlicer.transform(inputDataTable);
+            Assert.fail("Expected IllegalArgumentException");
+        } catch (Exception e) {
+            assertEquals("Parameter indices is given an invalid value {}", e.getMessage());
+            assertEquals(IllegalArgumentException.class, ((Throwable) e).getClass());
+        }
+    }
+
+    @Test
+    public void testIndicesLargerThanVectorSize() {
+        try {
+            VectorSlicer vectorSlicer =
+                    new VectorSlicer()
+                            .setInputCol("vec")
+                            .setOutputCol("sliceVec")
+                            .setIndices(1, 2, 10);
+            Table output = vectorSlicer.transform(inputDataTable)[0];
+            DataStream<Row> dataStream = tEnv.toDataStream(output);
+            IteratorUtils.toList(dataStream.executeAndCollect());
+            Assert.fail("Expected RuntimeException");
+        } catch (Exception e) {
+            assertEquals(
+                    "Index value 10 is greater than vector size:6",
+                    ExceptionUtils.getRootCause(e).getMessage());
+            assertEquals(IllegalArgumentException.class, ExceptionUtils.getRootCause(e).getClass());
+        }
+    }
+
+    @Test
+    public void testIndicesSmallerThanZero() {
+        try {
+            new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(1, -2);
+            Assert.fail("Expected IllegalArgumentException");
+        } catch (Exception e) {
+            assertEquals("Parameter indices is given an invalid value {1,-2}", e.getMessage());
+            assertEquals(IllegalArgumentException.class, e.getClass());
+        }
+    }
+
+    @Test
+    public void testDuplicateIndices() {
+        try {
+            new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(1, 1, 3);
+            Assert.fail("Expected IllegalArgumentException");
+        } catch (Exception e) {
+            assertEquals("Parameter indices is given an invalid value {1,1,3}", e.getMessage());
+            assertEquals(IllegalArgumentException.class, ((Throwable) e).getClass());

Review Comment:
   The check for `exception class type` seems unnecessary since we already checked the error message.
   
   Same for other checks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on PR #131:
URL: https://github.com/apache/flink-ml/pull/131#issuecomment-1199092961

   Thanks for the update. LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r926208138


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicer.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A feature transformer that transforms a vector to a new one with a sub-array of the original

Review Comment:
   nit: Can you update the java doc as "A Transformer that..." following the existing java docs?



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicer.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A feature transformer that transforms a vector to a new one with a sub-array of the original
+ * features. It is useful for extracting features from a given vector. If the indices acquired from
+ * setIndices() are not in order, the indices of the result vector will be sorted.
+ */
+public class VectorSlicer implements Transformer<VectorSlicer>, VectorSlicerParams<VectorSlicer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public VectorSlicer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), VectorTypeInfo.INSTANCE),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCol()));
+        DataStream<Row> output =
+                tEnv.toDataStream(inputs[0])
+                        .map(new VectorSlice(getIndices(), getInputCol()), outputTypeInfo);
+        Table outputTable = tEnv.fromDataStream(output);
+        return new Table[] {outputTable};
+    }
+
+    /** Vector slice function. */
+    public static class VectorSlice implements MapFunction<Row, Row> {
+        private final Integer[] indices;
+        private final String inputCol;
+
+        public VectorSlice(Integer[] indices, String inputCol) {
+            this.indices = indices;
+            this.inputCol = inputCol;
+        }
+
+        @Override
+        public Row map(Row row) throws Exception {
+            Vector inputVec = row.getFieldAs(inputCol);
+            Vector outputVec;
+            Arrays.sort(indices);
+            if (inputVec instanceof DenseVector) {
+                double[] values = new double[indices.length];
+                for (int i = 0; i < indices.length; ++i) {
+                    if (indices[i] >= inputVec.size()) {
+                        throw new RuntimeException("Index is larger than vector size.");
+                    }
+                    values[i] = ((DenseVector) inputVec).values[indices[i]];
+                }
+                outputVec = new DenseVector(values);
+            } else {
+                int nnz = 0;
+                SparseVector vec = (SparseVector) inputVec;
+                int[] outputIndices = new int[indices.length];
+                double[] outputValues = new double[indices.length];
+                for (int i = 0; i < indices.length; i++) {
+                    if (indices[i] >= inputVec.size()) {
+                        throw new RuntimeException("Index is larger than vector size.");

Review Comment:
   Same as above.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicer.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A feature transformer that transforms a vector to a new one with a sub-array of the original
+ * features. It is useful for extracting features from a given vector. If the indices acquired from
+ * setIndices() are not in order, the indices of the result vector will be sorted.
+ */
+public class VectorSlicer implements Transformer<VectorSlicer>, VectorSlicerParams<VectorSlicer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public VectorSlicer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), VectorTypeInfo.INSTANCE),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCol()));
+        DataStream<Row> output =
+                tEnv.toDataStream(inputs[0])
+                        .map(new VectorSlice(getIndices(), getInputCol()), outputTypeInfo);
+        Table outputTable = tEnv.fromDataStream(output);
+        return new Table[] {outputTable};
+    }
+
+    /** Vector slice function. */
+    public static class VectorSlice implements MapFunction<Row, Row> {
+        private final Integer[] indices;
+        private final String inputCol;
+
+        public VectorSlice(Integer[] indices, String inputCol) {
+            this.indices = indices;
+            this.inputCol = inputCol;
+        }
+
+        @Override
+        public Row map(Row row) throws Exception {
+            Vector inputVec = row.getFieldAs(inputCol);
+            Vector outputVec;
+            Arrays.sort(indices);
+            if (inputVec instanceof DenseVector) {
+                double[] values = new double[indices.length];
+                for (int i = 0; i < indices.length; ++i) {
+                    if (indices[i] >= inputVec.size()) {
+                        throw new RuntimeException("Index is larger than vector size.");

Review Comment:
   How about we only check the size of the inputVector is greater than the last element in the `indices`?
   
   Also how about throwing a `IllegalArguementException` instead of `RuntimeException` here? Since it is wrong because of the illegal `indices`.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests VectorSlicer. */
+public class VectorSlicerTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1, 2.3, 3.4, 5.3, 5.1),
+                            Vectors.sparse(5, new int[] {1, 3, 4}, new double[] {0.1, 0.2, 0.3})),
+                    Row.of(
+                            1,
+                            Vectors.dense(2.3, 4.1, 1.3, 2.4, 5.1, 4.1),
+                            Vectors.sparse(5, new int[] {1, 2, 4}, new double[] {0.1, 0.2, 0.3})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.1, 3.1, 2.3);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(2.3, 4.1, 1.3);
+
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(3, new int[] {1}, new double[] {0.1});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(3, new int[] {1, 2}, new double[] {0.1, 0.2});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_1, result.getField(outputCol));
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_4, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_2, result.getField(outputCol));
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testParam() {
+        VectorSlicer vectorSlicer = new VectorSlicer();
+        assertEquals("output", vectorSlicer.getOutputCol());

Review Comment:
   Let's check the parameters in the same order, i.e., "inputCol, outputCol, indices" for consistency.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicer.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A feature transformer that transforms a vector to a new one with a sub-array of the original
+ * features. It is useful for extracting features from a given vector. If the indices acquired from
+ * setIndices() are not in order, the indices of the result vector will be sorted.
+ */
+public class VectorSlicer implements Transformer<VectorSlicer>, VectorSlicerParams<VectorSlicer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public VectorSlicer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), VectorTypeInfo.INSTANCE),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCol()));
+        DataStream<Row> output =
+                tEnv.toDataStream(inputs[0])
+                        .map(new VectorSlice(getIndices(), getInputCol()), outputTypeInfo);
+        Table outputTable = tEnv.fromDataStream(output);
+        return new Table[] {outputTable};
+    }
+
+    /** Vector slice function. */
+    public static class VectorSlice implements MapFunction<Row, Row> {
+        private final Integer[] indices;
+        private final String inputCol;
+
+        public VectorSlice(Integer[] indices, String inputCol) {
+            this.indices = indices;
+            this.inputCol = inputCol;
+        }
+
+        @Override
+        public Row map(Row row) throws Exception {
+            Vector inputVec = row.getFieldAs(inputCol);
+            Vector outputVec;
+            Arrays.sort(indices);

Review Comment:
   How about move `sort` to the constructor method for high efficiency?



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicer.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A feature transformer that transforms a vector to a new one with a sub-array of the original
+ * features. It is useful for extracting features from a given vector. If the indices acquired from
+ * setIndices() are not in order, the indices of the result vector will be sorted.
+ */
+public class VectorSlicer implements Transformer<VectorSlicer>, VectorSlicerParams<VectorSlicer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public VectorSlicer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), VectorTypeInfo.INSTANCE),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCol()));
+        DataStream<Row> output =
+                tEnv.toDataStream(inputs[0])
+                        .map(new VectorSlice(getIndices(), getInputCol()), outputTypeInfo);
+        Table outputTable = tEnv.fromDataStream(output);
+        return new Table[] {outputTable};
+    }
+
+    /** Vector slice function. */
+    public static class VectorSlice implements MapFunction<Row, Row> {

Review Comment:
   nit: This method could be private and moved to the end of this java class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] weibozhao commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
weibozhao commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r932966469


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link VectorSlicer}. */
+public class VectorSlicerTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1, 2.3, 3.4, 5.3, 5.1),
+                            Vectors.sparse(5, new int[] {1, 3, 4}, new double[] {0.1, 0.2, 0.3})),
+                    Row.of(
+                            1,
+                            Vectors.dense(2.3, 4.1, 1.3, 2.4, 5.1, 4.1),
+                            Vectors.sparse(5, new int[] {1, 2, 4}, new double[] {0.1, 0.2, 0.3})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.1, 3.1, 2.3);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(2.3, 4.1, 1.3);
+
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(3, new int[] {1}, new double[] {0.1});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(3, new int[] {1, 2}, new double[] {0.1, 0.2});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_1, result.getField(outputCol));
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_4, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_2, result.getField(outputCol));
+                }
+            } else {
+                throw new RuntimeException("Result id value is error, it must be 0 or 1.");
+            }
+        }
+    }
+
+    @Test
+    public void testParam() {
+        VectorSlicer vectorSlicer = new VectorSlicer();
+        assertEquals("input", vectorSlicer.getInputCol());
+        assertEquals("output", vectorSlicer.getOutputCol());
+        vectorSlicer.setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        assertEquals("vec", vectorSlicer.getInputCol());
+        assertEquals("sliceVec", vectorSlicer.getOutputCol());
+        assertArrayEquals(new Integer[] {0, 1, 2}, vectorSlicer.getIndices());
+    }
+
+    @Test
+    public void testSaveLoadAndTransform() throws Exception {
+        VectorSlicer vectorSlicer =
+                new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        VectorSlicer loadedVectorSlicer =
+                TestUtils.saveAndReload(
+                        tEnv, vectorSlicer, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+        Table output = loadedVectorSlicer.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedVectorSlicer.getOutputCol(), false);
+    }
+
+    @Test
+    public void testEmptyIndices() {

Review Comment:
   Empty indices means extracting nothing and return a vector with no values, which has no meaning. 
   I don't think all things that spark done make sence.
   
   @zhipeng93 What's your opinion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] weibozhao commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
weibozhao commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r932966469


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Tests {@link VectorSlicer}. */
+public class VectorSlicerTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1, 2.3, 3.4, 5.3, 5.1),
+                            Vectors.sparse(5, new int[] {1, 3, 4}, new double[] {0.1, 0.2, 0.3})),
+                    Row.of(
+                            1,
+                            Vectors.dense(2.3, 4.1, 1.3, 2.4, 5.1, 4.1),
+                            Vectors.sparse(5, new int[] {1, 2, 4}, new double[] {0.1, 0.2, 0.3})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.1, 3.1, 2.3);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(2.3, 4.1, 1.3);
+
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(3, new int[] {1}, new double[] {0.1});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(3, new int[] {1, 2}, new double[] {0.1, 0.2});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_1, result.getField(outputCol));
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_4, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_2, result.getField(outputCol));
+                }
+            } else {
+                throw new RuntimeException("Result id value is error, it must be 0 or 1.");
+            }
+        }
+    }
+
+    @Test
+    public void testParam() {
+        VectorSlicer vectorSlicer = new VectorSlicer();
+        assertEquals("input", vectorSlicer.getInputCol());
+        assertEquals("output", vectorSlicer.getOutputCol());
+        vectorSlicer.setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        assertEquals("vec", vectorSlicer.getInputCol());
+        assertEquals("sliceVec", vectorSlicer.getOutputCol());
+        assertArrayEquals(new Integer[] {0, 1, 2}, vectorSlicer.getIndices());
+    }
+
+    @Test
+    public void testSaveLoadAndTransform() throws Exception {
+        VectorSlicer vectorSlicer =
+                new VectorSlicer().setInputCol("vec").setOutputCol("sliceVec").setIndices(0, 1, 2);
+        VectorSlicer loadedVectorSlicer =
+                TestUtils.saveAndReload(
+                        tEnv, vectorSlicer, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+        Table output = loadedVectorSlicer.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedVectorSlicer.getOutputCol(), false);
+    }
+
+    @Test
+    public void testEmptyIndices() {

Review Comment:
   Empty indices means extracting nothing and return a vector with no values, which has no means. 
   I don't think all things that spark done make sence.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r932026230


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicerParams.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.ml.common.param.HasInputCol;
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.param.IntArrayParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/**
+ * Params of {@link VectorSlicer}.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface VectorSlicerParams<T> extends HasInputCol<T>, HasOutputCol<T> {
+    Param<Integer[]> INDICES =
+            new IntArrayParam(
+                    "indices",
+                    "An array of indices to select features from a vector column.",
+                    null,
+                    ParamValidators.numericalArrayGtEq(0));

Review Comment:
   I prefer to add an exclusive validator for VectorSlicer, as that is what spark `VectorSlicer.validIndices` has done.
   
   If we add `checkIndices()`, it means that exceptions would be thrown in `transform()` rather than `setIndices()`. I think the latter one has better readability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] weibozhao commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

Posted by GitBox <gi...@apache.org>.
weibozhao commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r931902284


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicerParams.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.ml.common.param.HasInputCol;
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.param.IntArrayParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/**
+ * Params of {@link VectorSlicer}.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface VectorSlicerParams<T> extends HasInputCol<T>, HasOutputCol<T> {
+    Param<Integer[]> INDICES =
+            new IntArrayParam(
+                    "indices",
+                    "An array of indices to select features from a vector column.",
+                    null,
+                    ParamValidators.numericalArrayGtEq(0));

Review Comment:
   I think one validator can not do all the checks of this parameter. We just do some checks. The duplicated check may be done in the algorithm code. The common validator only do one kind of checks. We can do the other checks in some other place, just like the begin of the algorithm. 
   
   I have add a `checkIndices()` to check the duplicated indices and empty array.
   
   If we put all the checks in one param validator, the validator maybe only can be used by VectorSlicer. 
   
   What do you think about?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org