You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/18 02:57:04 UTC

[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #131: [FLINK-28563] Add Transformer for VectorSlicer

yunfengzhou-hub commented on code in PR #131:
URL: https://github.com/apache/flink-ml/pull/131#discussion_r922941135


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/** Tests VectorSlicer. */
+public class VectorSlicerTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1, 2.3, 3.4, 5.3, 5.1),
+                            Vectors.sparse(5, new int[] {1, 3, 4}, new double[] {0.1, 0.2, 0.3})),
+                    Row.of(
+                            1,
+                            Vectors.dense(2.3, 4.1, 1.3, 2.4, 5.1, 4.1),
+                            Vectors.sparse(5, new int[] {1, 2, 4}, new double[] {0.1, 0.2, 0.3})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.1, 3.1, 2.3);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(2.3, 4.1, 1.3);
+
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(3, new int[] {1}, new double[] {0.1});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(3, new int[] {1, 2}, new double[] {0.1, 0.2});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_1, result.getField(outputCol));
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_4, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_2, result.getField(outputCol));
+                }
+            } else {
+                assertNull(result.getField(outputCol));
+            }
+        }
+    }
+
+    @Test
+    public void testParam() {
+        VectorSlicer vectorSlicer = new VectorSlicer();
+        assertEquals("output", vectorSlicer.getOutputCol());

Review Comment:
   Let's also check the inputCol and indices here.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicer.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A feature transformer that transforms a vector to a new one with a sub-array of the original
+ * features. It is useful for extracting features from a given vector.
+ */
+public class VectorSlicer implements Transformer<VectorSlicer>, VectorSlicerParams<VectorSlicer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public VectorSlicer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), VectorTypeInfo.INSTANCE),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCol()));
+        DataStream<Row> output =
+                tEnv.toDataStream(inputs[0])
+                        .map(new VectorSlice(getIndices(), getInputCol()), outputTypeInfo);
+        Table outputTable = tEnv.fromDataStream(output);
+        return new Table[] {outputTable};
+    }
+
+    /** Vector slice function. */
+    public static class VectorSlice implements MapFunction<Row, Row> {
+        private final Integer[] indices;
+        private final String inputCol;
+
+        public VectorSlice(Integer[] indices, String inputCol) {
+            this.indices = indices;
+            this.inputCol = inputCol;
+        }
+
+        @Override
+        public Row map(Row row) throws Exception {
+            Vector inputVec = row.getFieldAs(inputCol);
+            Vector outputVec;
+            if (inputVec instanceof DenseVector) {
+                double[] values = new double[indices.length];
+                for (int i = 0; i < indices.length; ++i) {
+                    if (indices[i] >= inputVec.size()) {
+                        throw new RuntimeException("Index is larger than vector size.");
+                    }
+                    values[i] = ((DenseVector) inputVec).values[indices[i]];
+                }
+                outputVec = new DenseVector(values);
+            } else {
+                int nnz = 0;
+                int[] inputIndices = ((SparseVector) inputVec).indices;
+                double[] inputValues = ((SparseVector) inputVec).values;
+                int[] outputIndices = new int[indices.length];
+                double[] outputValues = new double[indices.length];
+                for (int i = 0; i < indices.length; i++) {
+                    if (indices[i] >= inputVec.size()) {
+                        throw new RuntimeException("Index is larger than vector size.");
+                    }
+                    int pos = Arrays.binarySearch(inputIndices, indices[i]);

Review Comment:
   The `SparseVector.get()` method has already implemented the binary search. We can directly use that method without implementing these on our own.



##########
flink-ml-python/pyflink/ml/lib/param.py:
##########
@@ -155,6 +155,27 @@ def input_cols(self) -> Tuple[str, ...]:
         return self.get_input_cols()
 
 
+class HasIndices(WithParams, ABC):

Review Comment:
   Maybe we should implement it inside `vectorslicer.py`, instead of treating it as a common parameter. You can refer to the `K` parameter in `kmeans.py`.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java:
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/** Tests VectorSlicer. */
+public class VectorSlicerTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1, 2.3, 3.4, 5.3, 5.1),
+                            Vectors.sparse(5, new int[] {1, 3, 4}, new double[] {0.1, 0.2, 0.3})),
+                    Row.of(
+                            1,
+                            Vectors.dense(2.3, 4.1, 1.3, 2.4, 5.1, 4.1),
+                            Vectors.sparse(5, new int[] {1, 2, 4}, new double[] {0.1, 0.2, 0.3})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.1, 3.1, 2.3);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(2.3, 4.1, 1.3);
+
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(3, new int[] {1}, new double[] {0.1});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(3, new int[] {1, 2}, new double[] {0.1, 0.2});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_1, result.getField(outputCol));
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_4, result.getField(outputCol));
+                } else {
+                    assertEquals(EXPECTED_OUTPUT_DATA_2, result.getField(outputCol));
+                }
+            } else {
+                assertNull(result.getField(outputCol));

Review Comment:
   It seems that this line would not be reached. It might be better to remove this line or replace it with throwing an exception.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicer.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A feature transformer that transforms a vector to a new one with a sub-array of the original
+ * features. It is useful for extracting features from a given vector.
+ */

Review Comment:
   Let's add comments that describe what will happen if the indices acquired from `setIndices()` are not in order, and add corresponding test cases.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorslicer/VectorSlicerParams.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.vectorslicer;
+
+import org.apache.flink.ml.common.param.HasInputCol;
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.param.IntArrayParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/**
+ * Params of VectorSlicer.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface VectorSlicerParams<T> extends HasInputCol<T>, HasOutputCol<T> {
+    Param<Integer[]> INDICES =
+            new IntArrayParam(
+                    "indices",
+                    "An array of indices to select features from a vector column.",
+                    null,
+                    ParamValidators.nonEmptyArray());

Review Comment:
   It might be better to add a custom `ParamValidator` to verify that the indices are all greater or equal to 0 and are non-duplicate, like `VectorSlicer.validIndices()` in spark.



##########
flink-ml-python/pyflink/ml/lib/feature/tests/test_vectorslicer.py:
##########
@@ -0,0 +1,74 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+
+from pyflink.common import Types
+
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.vectorslicer import VectorSlicer
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+
+
+class VectorSlicerTest(PyFlinkMLTestCase):
+    def setUp(self):
+        super(VectorSlicerTest, self).setUp()
+        self.input_data_table = self.t_env.from_data_stream(
+            self.env.from_collection([
+                (1, Vectors.dense(2.1, 3.1, 1.2, 2.1)),
+                (2, Vectors.dense(2.3, 2.1, 1.3, 1.2)),
+            ],
+                type_info=Types.ROW_NAMED(
+                    ['id', 'vec'],
+                    [Types.INT(), DenseVectorTypeInfo()])))
+
+        self.expected_output_data_1 = Vectors.dense(2.1, 3.1, 1.2)
+        self.expected_output_data_2 = Vectors.dense(2.3, 2.1, 1.3)
+
+    def test_param(self):
+        vector_slicer = VectorSlicer()
+
+        self.assertEqual('input', vector_slicer.input_col)
+        self.assertEqual('output', vector_slicer.output_col)
+
+        vector_slicer.set_input_col('vec') \
+            .set_output_col('slice_vec') \
+            .set_indices(0, 1, 2)
+
+        self.assertEqual('vec', vector_slicer.input_col)
+        self.assertEqual((0, 1, 2), vector_slicer.indices)
+        self.assertEqual('slice_vec', vector_slicer.output_col)
+
+    def test_save_load_transform(self):
+        vector_slicer = VectorSlicer() \
+            .set_input_col('vec') \
+            .set_output_col('slice_vec') \
+            .set_indices(0, 1, 2)
+
+        path = os.path.join(self.temp_dir, 'test_save_load_transform_vector_slicer')
+        vector_slicer.save(path)
+        vector_slicer = VectorSlicer.load(self.t_env, path)
+
+        output_table = vector_slicer.transform(self.input_data_table)[0]
+        actual_outputs = [(result[0], result[2]) for result in
+                          self.t_env.to_data_stream(output_table).execute_and_collect()]
+
+        for actual_output in actual_outputs:

Review Comment:
   It might be better to also check the length of the actual outputs.



##########
flink-ml-python/pyflink/ml/lib/feature/tests/test_vectorslicer.py:
##########
@@ -0,0 +1,74 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+
+from pyflink.common import Types
+
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.vectorslicer import VectorSlicer
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+
+
+class VectorSlicerTest(PyFlinkMLTestCase):
+    def setUp(self):
+        super(VectorSlicerTest, self).setUp()
+        self.input_data_table = self.t_env.from_data_stream(
+            self.env.from_collection([
+                (1, Vectors.dense(2.1, 3.1, 1.2, 2.1)),
+                (2, Vectors.dense(2.3, 2.1, 1.3, 1.2)),
+            ],
+                type_info=Types.ROW_NAMED(
+                    ['id', 'vec'],
+                    [Types.INT(), DenseVectorTypeInfo()])))
+
+        self.expected_output_data_1 = Vectors.dense(2.1, 3.1, 1.2)
+        self.expected_output_data_2 = Vectors.dense(2.3, 2.1, 1.3)
+
+    def test_param(self):
+        vector_slicer = VectorSlicer()
+
+        self.assertEqual('input', vector_slicer.input_col)
+        self.assertEqual('output', vector_slicer.output_col)

Review Comment:
   Let's use the methods instead of accessing members, which means instead of 
   ```python
   vector_slicer.input_col
   ```
   Let's use
   ```python
   vector_slicer.get_input_col()
   ```
   
   Besides, let's add check for indices here as well.



##########
flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/VectorSlicerExample.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.feature;
+
+import org.apache.flink.ml.feature.vectorslicer.VectorSlicer;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that creates a VectorSlicer instance and uses it for feature engineering. */
+public class VectorSlicerExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<Row> inputStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(2.1, 3.1, 1.2, 3.1, 4.6)),
+                        Row.of(Vectors.dense(1.2, 3.1, 4.6, 2.1, 3.1)));
+        Table inputTable = tEnv.fromDataStream(inputStream).as("vec");
+
+        // Creates a VectorSlicer object and initializes its parameters.
+        VectorSlicer vectorSlicer =
+                new VectorSlicer().setInputCol("vec").setIndices(1, 2, 3).setOutputCol("slicedVec");
+
+        // Uses the VectorSlicer object for feature transformations.
+        Table outputTable = vectorSlicer.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+
+            Vector inputValue = (Vector) row.getField(vectorSlicer.getInputCol());
+
+            Vector outputValue = (Vector) row.getField(vectorSlicer.getOutputCol());
+
+            System.out.printf("Input Values: %s \tOutput Value: %s\n", inputValue, outputValue);

Review Comment:
   nit: "Values" -> "Value"



##########
flink-ml-python/pyflink/examples/ml/feature/vectorslicer_example.py:
##########
@@ -0,0 +1,64 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that creates a VectorSlicer instance and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to set up Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.vectorslicer import VectorSlicer
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data_table = t_env.from_data_stream(
+    env.from_collection([
+        (1, Vectors.dense(2.1, 3.1, 1.2, 2.1)),
+        (2, Vectors.dense(2.3, 2.1, 1.3, 1.2)),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['id', 'vec'],
+            [Types.INT(), DenseVectorTypeInfo()])))
+
+# create a vector slicer object and initialize its parameters
+vector_slicer = VectorSlicer() \
+    .set_input_col('vec') \
+    .set_indices(1, 2, 3) \
+    .set_output_col('sub_vec')
+
+# use the vector slicer model for feature engineering

Review Comment:
   I just realized that vector slicer is not a Model. Could you please help fix this comment with something like `vector slicer instance` or `vector slicer object`, and fix other places like that in `vectorassembler_example.py`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org