You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/21 06:21:18 UTC

[GitHub] [flink-ml] weibozhao opened a new pull request, #135: Support ElementwiseProduct in FlinkML

weibozhao opened a new pull request, #135:
URL: https://github.com/apache/flink-ml/pull/135

   #What is the purpose of the change
   Add Transformer for ElementwiseProduct in Flink ML.
   
   #Brief change log
   Add Transformer for ElementwiseProduct in Flink ML (Java & Python).
   Add unit test for ElementwiseProduct (Java & Python)
   Add example code for ElementwiseProduct (Java&Python)
   
   #Does this pull request potentially affect one of the following parts:
   Dependencies (does it add or upgrade a dependency): (no)
   The public API, i.e., is any changed class annotated with @public(Evolving): (no)
   Does this pull request introduce a new feature? (yes)
   If yes, how is the feature documented? (Java doc)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] weibozhao commented on a diff in pull request #135: [FLINK-28611] Add Transformer for ElementwiseProduct

Posted by GitBox <gi...@apache.org>.
weibozhao commented on code in PR #135:
URL: https://github.com/apache/flink-ml/pull/135#discussion_r935302096


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.elementwiseproduct.ElementwiseProduct;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/** Tests {@link ElementwiseProduct}. */
+public class ElementwiseProductTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1),
+                            Vectors.sparse(5, new int[] {3}, new double[] {1.0})),
+                    Row.of(
+                            1,
+                            Vectors.dense(1.1, 3.3),
+                            Vectors.sparse(
+                                    5, new int[] {4, 2, 3, 1}, new double[] {4.0, 2.0, 3.0, 1.0})),
+                    Row.of(2, null, null));
+
+    private static final double[] EXPECTED_OUTPUT_DENSE_VEC_ARRAY_1 = new double[] {2.31, 3.41};
+    private static final double[] EXPECTED_OUTPUT_DENSE_VEC_ARRAY_2 = new double[] {1.21, 3.63};
+
+    private static final int EXPECTED_OUTPUT_SPARSE_VEC_SIZE_1 = 5;
+    private static final int[] EXPECTED_OUTPUT_SPARSE_VEC_INDICES_1 = new int[] {3};
+    private static final double[] EXPECTED_OUTPUT_SPARSE_VEC_VALUES_1 = new double[] {0.0};
+
+    private static final int EXPECTED_OUTPUT_SPARSE_VEC_SIZE_2 = 5;
+    private static final int[] EXPECTED_OUTPUT_SPARSE_VEC_INDICES_2 = new int[] {1, 2, 3, 4};
+    private static final double[] EXPECTED_OUTPUT_SPARSE_VEC_VALUES_2 =
+            new double[] {1.1, 0.0, 0.0, 0.0};

Review Comment:
    I think there is no need to do this. For doing this is too expensive, we need to do the below things:
   1. Loop all the elements in the sparse vector and judge whether they equal zero(double) or not. If an element is 1.0e-12, It will give us some confusion.
   2. New two arrays to store new indices and new values. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on pull request #135: [FLINK-28611] Add Transformer for ElementwiseProduct

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on PR #135:
URL: https://github.com/apache/flink-ml/pull/135#issuecomment-1203459485

   Thanks for the update. LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] weibozhao commented on a diff in pull request #135: [FLINK-28611] Support ElementwiseProduct in FlinkML

Posted by GitBox <gi...@apache.org>.
weibozhao commented on code in PR #135:
URL: https://github.com/apache/flink-ml/pull/135#discussion_r932814503


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/VectorParam.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.param;
+
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+
+import java.util.List;
+import java.util.Map;
+
+/** Class for the Vector parameter. */
+public class VectorParam extends Param<Vector> {
+
+    public VectorParam(
+            String name,
+            String description,
+            Vector defaultValue,
+            ParamValidator<Vector> validator) {
+        super(name, Vector.class, description, defaultValue, validator);
+    }
+
+    public VectorParam(String name, String description, Vector defaultValue) {
+        this(name, description, defaultValue, ParamValidators.alwaysTrue());
+    }
+
+    @Override
+    public Vector jsonDecode(Object object) {
+        Map<String, Object> vecValues = (Map) object;
+        if (vecValues.size() == 1) {
+            List<Double> list = (List<Double>) vecValues.get("values");
+            double[] values = new double[list.size()];
+            for (int i = 0; i < values.length; ++i) {
+                values[i] = list.get(i);
+            }
+            return new DenseVector(values);
+        } else if (vecValues.size() == 3) {
+            List<Double> valuesList = (List<Double>) vecValues.get("values");
+            List<Integer> indicesList = (List<Integer>) vecValues.get("indices");
+            int n = (int) vecValues.get("n");
+            double[] values = new double[valuesList.size()];

Review Comment:
   List<Double> toArray is Double[], but we need double[] in the constructor of SparseVector. Then I use for loop instead of List.toArray



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #135: [FLINK-28611] Add Transformer for ElementwiseProduct

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #135:
URL: https://github.com/apache/flink-ml/pull/135#discussion_r935044159


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.elementwiseproduct.ElementwiseProduct;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/** Tests {@link ElementwiseProduct}. */
+public class ElementwiseProductTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1),
+                            Vectors.sparse(5, new int[] {3}, new double[] {1.0})),
+                    Row.of(
+                            1,
+                            Vectors.dense(1.1, 3.3),
+                            Vectors.sparse(
+                                    5, new int[] {4, 2, 3, 1}, new double[] {4.0, 2.0, 3.0, 1.0})),
+                    Row.of(2, null, null));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.31, 3.41);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(1.21, 3.63);
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(5, new int[] {3}, new double[] {0.0});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(5, new int[] {1, 2, 3, 4}, new double[] {1.1, 0.0, 0.0, 0.0});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)

Review Comment:
   Could you please verify that the input data is not affected after this transformation? I found that while output is `[1.21, 3.63]`, input is also changed into `[1.21, 3.63]`. There might be a bug here.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.elementwiseproduct.ElementwiseProduct;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/** Tests {@link ElementwiseProduct}. */
+public class ElementwiseProductTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1),
+                            Vectors.sparse(5, new int[] {3}, new double[] {1.0})),
+                    Row.of(
+                            1,
+                            Vectors.dense(1.1, 3.3),
+                            Vectors.sparse(
+                                    5, new int[] {4, 2, 3, 1}, new double[] {4.0, 2.0, 3.0, 1.0})),
+                    Row.of(2, null, null));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 = Vectors.dense(2.31, 3.41);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 = Vectors.dense(1.21, 3.63);
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(5, new int[] {3}, new double[] {0.0});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(5, new int[] {1, 2, 3, 4}, new double[] {1.1, 0.0, 0.0, 0.0});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(3, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    assertEquals(EXPECTED_OUTPUT_DATA_3, result.getField(outputCol));
+                } else {
+                    assertArrayEquals(
+                            EXPECTED_OUTPUT_DATA_1.values,

Review Comment:
   Can we use the same comparison command for different situations, like for SparseVector and DenseVector, and even for null values as well? It might help to improve the readability of the test cases.
   
   Besides, let's also try to refractor names like `EXPECTED_OUTPUT_DATA_1` and `EXPECTED_OUTPUT_DATA_3`. These names seem to be of low readability.



##########
flink-ml-python/pyflink/ml/core/tests/test_stage.py:
##########
@@ -21,14 +21,16 @@
 from pyflink.table import StreamTableEnvironment
 
 from pyflink.ml.core.api import Stage
+from pyflink.ml.core.linalg import Vectors
 from pyflink.ml.core.param import ParamValidators, Param, BooleanParam, IntParam, \
-    FloatParam, StringParam, IntArrayParam, FloatArrayParam, StringArrayParam
+    FloatParam, StringParam, VectorParam, IntArrayParam, FloatArrayParam, StringArrayParam
 from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
 
 BOOLEAN_PARAM = BooleanParam("boolean_param", "Description", False)
 INT_PARAM = IntParam("int_param", "Description", 1, ParamValidators.lt(100))
 FLOAT_PARAM = FloatParam("float_param", "Description", 3.0, ParamValidators.lt(100))
 STRING_PARAM = StringParam('string_param', "Description", "5")
+VECTOR_PARAM = VectorParam('vector_param', "Description", Vectors.dense(1, 2, 3))

Review Comment:
   It might be unable to verify the correctness of `VectorParam` by just creating the `VECTOR_PARAM`. Could you please refer to tests for other parameters, like `IntParam`, and add corresponding test cases about `VectorParam`?



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.elementwiseproduct.ElementwiseProduct;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/** Tests {@link ElementwiseProduct}. */
+public class ElementwiseProductTest extends AbstractTestBase {

Review Comment:
   Let's add a `testOutputSchema` test case like in other algorithms as well.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/elementwiseproduct/ElementwiseProduct.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.elementwiseproduct;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.BLAS;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * ElementwiseProduct is a transformer that multiplies each input vector with a given scaling vector
+ * using Hadamard product.
+ *
+ * <p>If input vector is null, then the transformer will return null. If input vector size not

Review Comment:
   It seems that there is still a grammar error in this paragraph. Could you please check and fix it?
   
   I found that Grammarly is a helpful tool for finding out grammar errors. You may also try and use it to refractor the documents.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #135: [FLINK-28611] Support ElementwiseProduct in FlinkML

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #135:
URL: https://github.com/apache/flink-ml/pull/135#discussion_r931748916


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/VectorParam.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.param;
+
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+
+import java.util.List;
+import java.util.Map;
+
+/** Class for the Vector parameter. */
+public class VectorParam extends Param<Vector> {

Review Comment:
   Could you please add corresponding tests in `StageTest` to verify this class?



##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/VectorParam.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.param;
+
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+
+import java.util.List;
+import java.util.Map;
+
+/** Class for the Vector parameter. */
+public class VectorParam extends Param<Vector> {
+
+    public VectorParam(
+            String name,
+            String description,
+            Vector defaultValue,
+            ParamValidator<Vector> validator) {
+        super(name, Vector.class, description, defaultValue, validator);
+    }
+
+    public VectorParam(String name, String description, Vector defaultValue) {
+        this(name, description, defaultValue, ParamValidators.alwaysTrue());
+    }
+
+    @Override
+    public Vector jsonDecode(Object object) {
+        Map<String, Object> vecValues = (Map) object;
+        if (vecValues.size() == 1) {
+            List<Double> list = (List<Double>) vecValues.get("values");
+            double[] values = new double[list.size()];
+            for (int i = 0; i < values.length; ++i) {
+                values[i] = list.get(i);
+            }
+            return new DenseVector(values);
+        } else if (vecValues.size() == 3) {
+            List<Double> valuesList = (List<Double>) vecValues.get("values");
+            List<Integer> indicesList = (List<Integer>) vecValues.get("indices");
+            int n = (int) vecValues.get("n");
+            double[] values = new double[valuesList.size()];
+            int[] indices = new int[indicesList.size()];
+            for (int i = 0; i < values.length; ++i) {
+                values[i] = valuesList.get(i);
+                indices[i] = indicesList.get(i);
+            }
+            return new SparseVector(n, indices, values);
+        } else {
+            throw new RuntimeException("Parameter scalingVec is invalid.");

Review Comment:
   Let's throw `IllegalxxxException` or `UnsupportedxxxException` here. 



##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/VectorParam.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.param;
+
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+
+import java.util.List;
+import java.util.Map;
+
+/** Class for the Vector parameter. */
+public class VectorParam extends Param<Vector> {
+
+    public VectorParam(
+            String name,
+            String description,
+            Vector defaultValue,
+            ParamValidator<Vector> validator) {
+        super(name, Vector.class, description, defaultValue, validator);
+    }
+
+    public VectorParam(String name, String description, Vector defaultValue) {
+        this(name, description, defaultValue, ParamValidators.alwaysTrue());
+    }
+
+    @Override
+    public Vector jsonDecode(Object object) {
+        Map<String, Object> vecValues = (Map) object;
+        if (vecValues.size() == 1) {
+            List<Double> list = (List<Double>) vecValues.get("values");
+            double[] values = new double[list.size()];
+            for (int i = 0; i < values.length; ++i) {
+                values[i] = list.get(i);
+            }
+            return new DenseVector(values);
+        } else if (vecValues.size() == 3) {
+            List<Double> valuesList = (List<Double>) vecValues.get("values");
+            List<Integer> indicesList = (List<Integer>) vecValues.get("indices");
+            int n = (int) vecValues.get("n");
+            double[] values = new double[valuesList.size()];

Review Comment:
   nit: `List.toArray()` can simplify the logic here.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/elementwiseproduct/ElementwiseProductParams.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.elementwiseproduct;
+
+import org.apache.flink.ml.common.param.HasInputCol;
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.VectorParam;
+
+/**
+ * Params of ElementwiseProduct.

Review Comment:
   nit: `{@link }`.



##########
flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/ElementwiseProductExample.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.feature;
+
+import org.apache.flink.ml.feature.elementwiseproduct.ElementwiseProduct;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/**
+ * Simple program that creates a ElementwiseProduct instance and uses it for feature engineering.

Review Comment:
   nit: "an ElementwiseProduct instance". Same for comments below.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/elementwiseproduct/ElementwiseProduct.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.elementwiseproduct;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * ElementwiseProduct is a transformer that multiplies each input vector by a given scaling vector,
+ * using element-wise multiplication.
+ */
+public class ElementwiseProduct
+        implements Transformer<ElementwiseProduct>, ElementwiseProductParams<ElementwiseProduct> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public ElementwiseProduct() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), VectorTypeInfo.INSTANCE),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCol()));
+        DataStream<Row> output =
+                tEnv.toDataStream(inputs[0])
+                        .map(new Elementwise(getInputCol(), getScalingVec()), outputTypeInfo);
+        Table outputTable = tEnv.fromDataStream(output);
+        return new Table[] {outputTable};
+    }
+
+    private static class Elementwise implements MapFunction<Row, Row> {
+        private final String inputCol;
+        private final DenseVector scalingVec;
+
+        public Elementwise(String inputCol, Vector scalingVec) {
+            this.inputCol = inputCol;
+            this.scalingVec = scalingVec.toDense();
+        }
+
+        @Override
+        public Row map(Row value) {
+            Vector inputVec = value.getFieldAs(inputCol);
+            if (null == inputVec) {
+                return null;
+            }
+            if (inputVec instanceof DenseVector) {

Review Comment:
   We can reuse `BLAS` operations here.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/elementwiseproduct/ElementwiseProduct.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.elementwiseproduct;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * ElementwiseProduct is a transformer that multiplies each input vector by a given scaling vector,
+ * using element-wise multiplication.

Review Comment:
   `Hadamard product` seems to be a more precise description.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/elementwiseproduct/ElementwiseProduct.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.elementwiseproduct;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * ElementwiseProduct is a transformer that multiplies each input vector by a given scaling vector,
+ * using element-wise multiplication.
+ */
+public class ElementwiseProduct
+        implements Transformer<ElementwiseProduct>, ElementwiseProductParams<ElementwiseProduct> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public ElementwiseProduct() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), VectorTypeInfo.INSTANCE),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCol()));
+        DataStream<Row> output =
+                tEnv.toDataStream(inputs[0])
+                        .map(new Elementwise(getInputCol(), getScalingVec()), outputTypeInfo);
+        Table outputTable = tEnv.fromDataStream(output);
+        return new Table[] {outputTable};
+    }
+
+    private static class Elementwise implements MapFunction<Row, Row> {

Review Comment:
   nit :`ElementwiseProductFunction`



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/elementwiseproduct/ElementwiseProduct.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.elementwiseproduct;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * ElementwiseProduct is a transformer that multiplies each input vector by a given scaling vector,
+ * using element-wise multiplication.
+ */
+public class ElementwiseProduct
+        implements Transformer<ElementwiseProduct>, ElementwiseProductParams<ElementwiseProduct> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public ElementwiseProduct() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), VectorTypeInfo.INSTANCE),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCol()));
+        DataStream<Row> output =
+                tEnv.toDataStream(inputs[0])
+                        .map(new Elementwise(getInputCol(), getScalingVec()), outputTypeInfo);
+        Table outputTable = tEnv.fromDataStream(output);
+        return new Table[] {outputTable};
+    }
+
+    private static class Elementwise implements MapFunction<Row, Row> {
+        private final String inputCol;
+        private final DenseVector scalingVec;
+
+        public Elementwise(String inputCol, Vector scalingVec) {
+            this.inputCol = inputCol;
+            this.scalingVec = scalingVec.toDense();

Review Comment:
   This conversion seems unnecessary and might cause performance issues.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.elementwiseproduct.ElementwiseProduct;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests ElementwiseProduct. */
+public class ElementwiseProductTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1),
+                            Vectors.sparse(5, new int[] {3}, new double[] {1.0})),
+                    Row.of(
+                            1,
+                            Vectors.dense(1.1, 3.3),
+                            Vectors.sparse(
+                                    5, new int[] {4, 2, 3, 1}, new double[] {4.0, 2.0, 3.0, 1.0})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 =
+            Vectors.dense(2.3100000000000005, 3.4100000000000006);

Review Comment:
   It might be better to use `2.31` here and set a tolerance threshold to ignore the `0.0000000000000005` difference.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/elementwiseproduct/ElementwiseProductParams.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.elementwiseproduct;
+
+import org.apache.flink.ml.common.param.HasInputCol;
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+import org.apache.flink.ml.param.VectorParam;
+
+/**
+ * Params of ElementwiseProduct.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface ElementwiseProductParams<T> extends HasInputCol<T>, HasOutputCol<T> {
+
+    Param<Vector> SCALING_VEC =
+            new VectorParam(
+                    "scalingVec",
+                    "the vector to multiply with input vectors.",

Review Comment:
   `Hadamard product` seems to be a more precise description.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/elementwiseproduct/ElementwiseProduct.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.elementwiseproduct;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * ElementwiseProduct is a transformer that multiplies each input vector by a given scaling vector,
+ * using element-wise multiplication.

Review Comment:
   Let's add more descriptions and test cases for the following situations:
   - the input vector is null
   - the size of the input vector does not match the scaling vector



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.elementwiseproduct.ElementwiseProduct;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests ElementwiseProduct. */
+public class ElementwiseProductTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1),
+                            Vectors.sparse(5, new int[] {3}, new double[] {1.0})),
+                    Row.of(
+                            1,
+                            Vectors.dense(1.1, 3.3),
+                            Vectors.sparse(
+                                    5, new int[] {4, 2, 3, 1}, new double[] {4.0, 2.0, 3.0, 1.0})));
+
+    private static final DenseVector EXPECTED_OUTPUT_DATA_1 =
+            Vectors.dense(2.3100000000000005, 3.4100000000000006);
+    private static final DenseVector EXPECTED_OUTPUT_DATA_2 =
+            Vectors.dense(1.2100000000000002, 3.63);
+    private static final SparseVector EXPECTED_OUTPUT_DATA_3 =
+            Vectors.sparse(5, new int[] {3}, new double[] {0.0});
+    private static final SparseVector EXPECTED_OUTPUT_DATA_4 =
+            Vectors.sparse(5, new int[] {1, 2, 3, 4}, new double[] {1.1, 0.0, 0.0, 0.0});
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(2, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                assertEquals(
+                        isSparse ? EXPECTED_OUTPUT_DATA_3 : EXPECTED_OUTPUT_DATA_1,
+                        result.getField(outputCol));
+            } else if (result.getField(0) == (Object) 1) {
+                assertEquals(
+                        isSparse ? EXPECTED_OUTPUT_DATA_4 : EXPECTED_OUTPUT_DATA_2,
+                        result.getField(outputCol));
+            }

Review Comment:
   nit: Let's add an `else` here to handle unexpected output results.



##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/VectorParam.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.param;
+
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+
+import java.util.List;
+import java.util.Map;
+
+/** Class for the Vector parameter. */
+public class VectorParam extends Param<Vector> {
+
+    public VectorParam(
+            String name,
+            String description,
+            Vector defaultValue,
+            ParamValidator<Vector> validator) {
+        super(name, Vector.class, description, defaultValue, validator);
+    }
+
+    public VectorParam(String name, String description, Vector defaultValue) {
+        this(name, description, defaultValue, ParamValidators.alwaysTrue());
+    }
+
+    @Override
+    public Vector jsonDecode(Object object) {
+        Map<String, Object> vecValues = (Map) object;
+        if (vecValues.size() == 1) {
+            List<Double> list = (List<Double>) vecValues.get("values");
+            double[] values = new double[list.size()];
+            for (int i = 0; i < values.length; ++i) {
+                values[i] = list.get(i);
+            }
+            return new DenseVector(values);
+        } else if (vecValues.size() == 3) {
+            List<Double> valuesList = (List<Double>) vecValues.get("values");
+            List<Integer> indicesList = (List<Integer>) vecValues.get("indices");
+            int n = (int) vecValues.get("n");
+            double[] values = new double[valuesList.size()];
+            int[] indices = new int[indicesList.size()];
+            for (int i = 0; i < values.length; ++i) {
+                values[i] = valuesList.get(i);
+                indices[i] = indicesList.get(i);
+            }
+            return new SparseVector(n, indices, values);
+        } else {
+            throw new RuntimeException("Parameter scalingVec is invalid.");

Review Comment:
   the exception message here seems improper.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java:
##########
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.elementwiseproduct.ElementwiseProduct;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests ElementwiseProduct. */

Review Comment:
   nit: `{@link }`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] weibozhao commented on a diff in pull request #135: [FLINK-28611] Add Transformer for ElementwiseProduct

Posted by GitBox <gi...@apache.org>.
weibozhao commented on code in PR #135:
URL: https://github.com/apache/flink-ml/pull/135#discussion_r935265005


##########
flink-ml-python/pyflink/ml/core/param.py:
##########
@@ -86,6 +87,8 @@ def get_param_map(self) -> Dict['Param[Any]', Any]:
     @staticmethod
     def _is_compatible_type(param: 'Param[V]', value: V) -> bool:
         if value is not None and param.type != type(value):
+            if type(value).__name__ == 'DenseVector' or type(value).__name__ == 'SparseVector':
+                return issubclass(type(value), param.type)

Review Comment:
   In the Java code, using `param.clazz.isAssignableFrom(value.getClass())` to check the compatible. 
   But in the python code, type and class are different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] weibozhao commented on a diff in pull request #135: [FLINK-28611] Add Transformer for ElementwiseProduct

Posted by GitBox <gi...@apache.org>.
weibozhao commented on code in PR #135:
URL: https://github.com/apache/flink-ml/pull/135#discussion_r935274102


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.elementwiseproduct.ElementwiseProduct;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/** Tests {@link ElementwiseProduct}. */
+public class ElementwiseProductTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1),
+                            Vectors.sparse(5, new int[] {3}, new double[] {1.0})),
+                    Row.of(
+                            1,
+                            Vectors.dense(1.1, 3.3),
+                            Vectors.sparse(
+                                    5, new int[] {4, 2, 3, 1}, new double[] {4.0, 2.0, 3.0, 1.0})),
+                    Row.of(2, null, null));
+
+    private static final double[] EXPECTED_OUTPUT_DENSE_VEC_ARRAY_1 = new double[] {2.31, 3.41};
+    private static final double[] EXPECTED_OUTPUT_DENSE_VEC_ARRAY_2 = new double[] {1.21, 3.63};
+
+    private static final int EXPECTED_OUTPUT_SPARSE_VEC_SIZE_1 = 5;
+    private static final int[] EXPECTED_OUTPUT_SPARSE_VEC_INDICES_1 = new int[] {3};
+    private static final double[] EXPECTED_OUTPUT_SPARSE_VEC_VALUES_1 = new double[] {0.0};
+
+    private static final int EXPECTED_OUTPUT_SPARSE_VEC_SIZE_2 = 5;
+    private static final int[] EXPECTED_OUTPUT_SPARSE_VEC_INDICES_2 = new int[] {1, 2, 3, 4};
+    private static final double[] EXPECTED_OUTPUT_SPARSE_VEC_VALUES_2 =
+            new double[] {1.1, 0.0, 0.0, 0.0};
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(3, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    SparseVector sparseVector = (SparseVector) result.getField(outputCol);
+                    assertEquals(EXPECTED_OUTPUT_SPARSE_VEC_SIZE_1, sparseVector.size());
+                    assertArrayEquals(EXPECTED_OUTPUT_SPARSE_VEC_INDICES_1, sparseVector.indices);
+                    assertArrayEquals(
+                            EXPECTED_OUTPUT_SPARSE_VEC_VALUES_1, sparseVector.values, 1.0e-5);
+                } else {
+                    assertArrayEquals(
+                            EXPECTED_OUTPUT_DENSE_VEC_ARRAY_1,
+                            ((DenseVector) result.getField(outputCol)).values,
+                            1.0e-5);
+                }
+            } else if (result.getField(0) == (Object) 1) {

Review Comment:
   If we use the above code, the epsilon cannot be added. When there exists a small error, just like 1.0e-6, the assert will fail. Assertion of double value must use epsilon.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #135: [FLINK-28611] Add Transformer for ElementwiseProduct

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #135:
URL: https://github.com/apache/flink-ml/pull/135#discussion_r934072380


##########
flink-ml-python/pyflink/ml/lib/feature/elementwiseproduct.py:
##########
@@ -0,0 +1,73 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.ml.core.param import ParamValidators, Param, VectorParam
+from pyflink.ml.core.wrapper import JavaWithParams
+from pyflink.ml.lib.feature.common import JavaFeatureTransformer
+from pyflink.ml.lib.param import HasInputCol, HasOutputCol
+from pyflink.ml.core.linalg import Vector
+
+
+class _ElementwiseProductParams(
+    JavaWithParams,
+    HasInputCol,
+    HasOutputCol
+):
+    """
+    Params for :class:`ElementwiseProduct`.
+    """
+
+    SCALING_VEC: Param[Vector] = VectorParam(
+        "scaling_vec",
+        "The scaling vector of the product op.",
+        None,
+        ParamValidators.not_null())
+
+    def __init__(self, java_params):
+        super(_ElementwiseProductParams, self).__init__(java_params)
+
+    def set_scaling_vec(self, value: Vector):
+        return self.set(self.SCALING_VEC, value)
+
+    def get_scaling_vec(self) -> Vector:
+        return self.get(self.SCALING_VEC)
+
+    @property
+    def scaling_vec(self) -> Vector:
+        return self.get_scaling_vec()
+
+
+class ElementwiseProduct(JavaFeatureTransformer, _ElementwiseProductParams):
+    """
+    A transformer that combines a given list of input columns into a vector column. Types of
+    input columns must be either vector or numerical value.
+
+    The `keep` option of :class:HasHandleInvalid means that we output bad rows with output column
+    set to null.

Review Comment:
   Let's keep the description the same as that in Java.



##########
flink-ml-python/pyflink/examples/ml/feature/elementwiseproduct_example.py:
##########
@@ -0,0 +1,64 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that creates a ElementwiseProduct instance and uses it for feature
+# engineering.
+#
+# Before executing this program, please make sure you have followed Flink ML's
+# quick start guideline to set up Flink ML and Flink environment. The guideline
+# can be found at
+#
+# https://nightlies.apache.org/flink/flink-ml-docs-master/docs/try-flink-ml/quick-start/
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.elementwiseproduct import ElementwiseProduct
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data_table = t_env.from_data_stream(
+    env.from_collection([
+        (1, Vectors.dense(2.1, 3.1)),
+        (2, Vectors.dense(1.1, 3.3))
+    ],
+        type_info=Types.ROW_NAMED(
+            ['id', 'vec'],
+            [Types.INT(), DenseVectorTypeInfo()])))
+
+# create a elementwise product object and initialize its parameters

Review Comment:
   nit: `an`.



##########
flink-ml-python/pyflink/ml/lib/feature/elementwiseproduct.py:
##########
@@ -0,0 +1,73 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.ml.core.param import ParamValidators, Param, VectorParam
+from pyflink.ml.core.wrapper import JavaWithParams
+from pyflink.ml.lib.feature.common import JavaFeatureTransformer
+from pyflink.ml.lib.param import HasInputCol, HasOutputCol
+from pyflink.ml.core.linalg import Vector
+
+
+class _ElementwiseProductParams(
+    JavaWithParams,
+    HasInputCol,
+    HasOutputCol
+):
+    """
+    Params for :class:`ElementwiseProduct`.
+    """
+
+    SCALING_VEC: Param[Vector] = VectorParam(
+        "scaling_vec",
+        "The scaling vector of the product op.",

Review Comment:
   Let's keep the description the same as that in Java.



##########
flink-ml-python/pyflink/ml/core/param.py:
##########
@@ -333,6 +334,17 @@ def __init__(self, name: str, description: str, default_value: Optional[str],
         super(StringParam, self).__init__(name, str, "str", description, default_value, validator)
 
 
+class VectorParam(Param[Vector]):

Review Comment:
   Let's add test cases for this parameter in python as well. Relevant test cases seem to be in `test_stage.py`.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/elementwiseproduct/ElementwiseProduct.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.elementwiseproduct;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.BLAS;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * ElementwiseProduct is transformer that multiplies each input vector with a given scaling vector
+ * element by element. ElementwiseProduct has another name called Hadamard product.
+ *
+ * <p>If input vector is null, then return null value instead of product result.
+ *
+ * <p>If input vector size not equals scaling vector size then throw Exceptions.

Review Comment:
   Could you please refractor these two paragraphs? Currently, there are some grammar errors and ambiguity in them. For example, who will return what kind of exception?



##########
flink-ml-python/pyflink/ml/lib/feature/tests/test_elementwiseproduct.py:
##########
@@ -0,0 +1,76 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+
+from pyflink.common import Types
+
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.elementwiseproduct import ElementwiseProduct
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+
+
+class ElementwiseProductTest(PyFlinkMLTestCase):
+    def setUp(self):
+        super(ElementwiseProductTest, self).setUp()
+        self.input_data_table = self.t_env.from_data_stream(
+            self.env.from_collection([
+                (0,
+                 Vectors.dense(2.1, 3.1)),
+                (1,
+                 Vectors.dense(1.1, 3.3)),
+            ],
+                type_info=Types.ROW_NAMED(
+                    ['id', 'vec'],
+                    [Types.INT(), DenseVectorTypeInfo()])))
+
+        self.expected_output_data_1 = Vectors.dense(2.3100000000000005, 3.4100000000000006)
+        self.expected_output_data_2 = Vectors.dense(1.2100000000000002, 3.63)
+
+    def test_param(self):
+        elementwise_product = ElementwiseProduct()
+
+        self.assertEqual('input', elementwise_product.get_input_col())
+        self.assertEqual('output', elementwise_product.get_output_col())
+
+        elementwise_product.set_input_col('vec') \
+            .set_output_col('output_vec') \
+            .set_scaling_vec(Vectors.dense(1.1, 1.1))
+
+        self.assertEqual('vec', elementwise_product.get_input_col())
+        self.assertEqual(Vectors.dense(1.1, 1.1), elementwise_product.get_scaling_vec())
+        self.assertEqual('output_vec', elementwise_product.get_output_col())
+
+    def test_save_load_transform(self):
+        elementwise_product = ElementwiseProduct() \
+            .set_input_col('vec') \
+            .set_output_col('output_vec') \
+            .set_scaling_vec(Vectors.dense(1.1, 1.1))
+
+        path = os.path.join(self.temp_dir, 'test_save_load_transform_elementwise_product')
+        elementwise_product.save(path)
+        elementwise_product = ElementwiseProduct.load(self.t_env, path)
+
+        output_table = elementwise_product.transform(self.input_data_table)[0]
+        actual_outputs = [(result[0], result[2]) for result in
+                          self.t_env.to_data_stream(output_table).execute_and_collect()]
+
+        for actual_output in actual_outputs:

Review Comment:
   Let's check the size of `actual_outputs` here.



##########
flink-ml-python/pyflink/ml/lib/feature/tests/test_elementwiseproduct.py:
##########
@@ -0,0 +1,76 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+
+from pyflink.common import Types
+
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.elementwiseproduct import ElementwiseProduct
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+
+
+class ElementwiseProductTest(PyFlinkMLTestCase):
+    def setUp(self):
+        super(ElementwiseProductTest, self).setUp()
+        self.input_data_table = self.t_env.from_data_stream(
+            self.env.from_collection([
+                (0,
+                 Vectors.dense(2.1, 3.1)),
+                (1,
+                 Vectors.dense(1.1, 3.3)),
+            ],
+                type_info=Types.ROW_NAMED(
+                    ['id', 'vec'],
+                    [Types.INT(), DenseVectorTypeInfo()])))
+
+        self.expected_output_data_1 = Vectors.dense(2.3100000000000005, 3.4100000000000006)
+        self.expected_output_data_2 = Vectors.dense(1.2100000000000002, 3.63)

Review Comment:
   Let's apply the changes in Java to Python test cases as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] weibozhao commented on a diff in pull request #135: [FLINK-28611] Support ElementwiseProduct in FlinkML

Posted by GitBox <gi...@apache.org>.
weibozhao commented on code in PR #135:
URL: https://github.com/apache/flink-ml/pull/135#discussion_r932814503


##########
flink-ml-core/src/main/java/org/apache/flink/ml/param/VectorParam.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.param;
+
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+
+import java.util.List;
+import java.util.Map;
+
+/** Class for the Vector parameter. */
+public class VectorParam extends Param<Vector> {
+
+    public VectorParam(
+            String name,
+            String description,
+            Vector defaultValue,
+            ParamValidator<Vector> validator) {
+        super(name, Vector.class, description, defaultValue, validator);
+    }
+
+    public VectorParam(String name, String description, Vector defaultValue) {
+        this(name, description, defaultValue, ParamValidators.alwaysTrue());
+    }
+
+    @Override
+    public Vector jsonDecode(Object object) {
+        Map<String, Object> vecValues = (Map) object;
+        if (vecValues.size() == 1) {
+            List<Double> list = (List<Double>) vecValues.get("values");
+            double[] values = new double[list.size()];
+            for (int i = 0; i < values.length; ++i) {
+                values[i] = list.get(i);
+            }
+            return new DenseVector(values);
+        } else if (vecValues.size() == 3) {
+            List<Double> valuesList = (List<Double>) vecValues.get("values");
+            List<Integer> indicesList = (List<Integer>) vecValues.get("indices");
+            int n = (int) vecValues.get("n");
+            double[] values = new double[valuesList.size()];

Review Comment:
   `List<Double>` toArray is `Double[]`, but we need double[] in the constructor of SparseVector. Then I use `for loop` instead of List.toArray



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #135: [FLINK-28611] Add Transformer for ElementwiseProduct

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #135:
URL: https://github.com/apache/flink-ml/pull/135#discussion_r935210096


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.elementwiseproduct.ElementwiseProduct;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/** Tests {@link ElementwiseProduct}. */
+public class ElementwiseProductTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1),
+                            Vectors.sparse(5, new int[] {3}, new double[] {1.0})),
+                    Row.of(
+                            1,
+                            Vectors.dense(1.1, 3.3),
+                            Vectors.sparse(
+                                    5, new int[] {4, 2, 3, 1}, new double[] {4.0, 2.0, 3.0, 1.0})),
+                    Row.of(2, null, null));
+
+    private static final double[] EXPECTED_OUTPUT_DENSE_VEC_ARRAY_1 = new double[] {2.31, 3.41};
+    private static final double[] EXPECTED_OUTPUT_DENSE_VEC_ARRAY_2 = new double[] {1.21, 3.63};
+
+    private static final int EXPECTED_OUTPUT_SPARSE_VEC_SIZE_1 = 5;
+    private static final int[] EXPECTED_OUTPUT_SPARSE_VEC_INDICES_1 = new int[] {3};
+    private static final double[] EXPECTED_OUTPUT_SPARSE_VEC_VALUES_1 = new double[] {0.0};
+
+    private static final int EXPECTED_OUTPUT_SPARSE_VEC_SIZE_2 = 5;
+    private static final int[] EXPECTED_OUTPUT_SPARSE_VEC_INDICES_2 = new int[] {1, 2, 3, 4};
+    private static final double[] EXPECTED_OUTPUT_SPARSE_VEC_VALUES_2 =
+            new double[] {1.1, 0.0, 0.0, 0.0};
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(3, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    SparseVector sparseVector = (SparseVector) result.getField(outputCol);
+                    assertEquals(EXPECTED_OUTPUT_SPARSE_VEC_SIZE_1, sparseVector.size());
+                    assertArrayEquals(EXPECTED_OUTPUT_SPARSE_VEC_INDICES_1, sparseVector.indices);
+                    assertArrayEquals(
+                            EXPECTED_OUTPUT_SPARSE_VEC_VALUES_1, sparseVector.values, 1.0e-5);
+                } else {
+                    assertArrayEquals(
+                            EXPECTED_OUTPUT_DENSE_VEC_ARRAY_1,
+                            ((DenseVector) result.getField(outputCol)).values,
+                            1.0e-5);
+                }
+            } else if (result.getField(0) == (Object) 1) {

Review Comment:
   Do you think the following code could be simpler? Take the sparse vector case as an example:
   ```java
       private static final List<Vector> EXPECTED_OUTPUT_SPARSE_VEC =
               Arrays.asList(
                       Vectors.sparse(5, new int[] {3}, new double[] {0.0}),
                       Vectors.sparse(5, new int[] {1, 2, 3, 4}, new double[] {1.1, 0.0, 0.0, 0.0}),
                       null
               );
   
   ...
   
       private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
               throws Exception {
           List<Row> results = IteratorUtils.toList(output.execute().collect());
           results.sort(Comparator.comparingInt(x -> (Integer) x.getField(0)));
           List<Vector> vectors = results.stream().map(x -> (Vector) x.getField(outputCol)).collect(Collectors.toList());
           assertEquals(vectors, EXPECTED_OUTPUT_SPARSE_VEC);
       }
   ```



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.elementwiseproduct.ElementwiseProduct;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/** Tests {@link ElementwiseProduct}. */
+public class ElementwiseProductTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1),
+                            Vectors.sparse(5, new int[] {3}, new double[] {1.0})),
+                    Row.of(
+                            1,
+                            Vectors.dense(1.1, 3.3),
+                            Vectors.sparse(
+                                    5, new int[] {4, 2, 3, 1}, new double[] {4.0, 2.0, 3.0, 1.0})),
+                    Row.of(2, null, null));
+
+    private static final double[] EXPECTED_OUTPUT_DENSE_VEC_ARRAY_1 = new double[] {2.31, 3.41};
+    private static final double[] EXPECTED_OUTPUT_DENSE_VEC_ARRAY_2 = new double[] {1.21, 3.63};
+
+    private static final int EXPECTED_OUTPUT_SPARSE_VEC_SIZE_1 = 5;
+    private static final int[] EXPECTED_OUTPUT_SPARSE_VEC_INDICES_1 = new int[] {3};
+    private static final double[] EXPECTED_OUTPUT_SPARSE_VEC_VALUES_1 = new double[] {0.0};
+
+    private static final int EXPECTED_OUTPUT_SPARSE_VEC_SIZE_2 = 5;
+    private static final int[] EXPECTED_OUTPUT_SPARSE_VEC_INDICES_2 = new int[] {1, 2, 3, 4};
+    private static final double[] EXPECTED_OUTPUT_SPARSE_VEC_VALUES_2 =
+            new double[] {1.1, 0.0, 0.0, 0.0};

Review Comment:
   It might be uncommon for sparse vectors to store zero values. Can we improve it to 
   ```java
       private static final int[] EXPECTED_OUTPUT_SPARSE_VEC_INDICES_2 = new int[] {1};
       private static final double[] EXPECTED_OUTPUT_SPARSE_VEC_VALUES_2 =
               new double[] {1.1};
   ```
   by modifying `ElementwiseProduct` or `BLAS.hDot`?



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/elementwiseproduct/ElementwiseProduct.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.elementwiseproduct;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.BLAS;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * An transformer that multiplies each input vector with a given scaling vector using Hadamard
+ * product.
+ *
+ * <p>If the input vector is null, then the transformer will return null. If the input vector size
+ * not equals scaling vector size then the transformer will throw IllegalArgumentException.

Review Comment:
   nit: "If the size of the input vector does not equal the size of the scaling vector, the transformer will throw {@link IllegalArgumentException}`."



##########
flink-ml-python/pyflink/ml/lib/feature/tests/test_elementwiseproduct.py:
##########
@@ -0,0 +1,83 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+
+from pyflink.common import Types
+
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.elementwiseproduct import ElementwiseProduct
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+
+
+class ElementwiseProductTest(PyFlinkMLTestCase):
+    def setUp(self):
+        super(ElementwiseProductTest, self).setUp()
+        self.input_data_table = self.t_env.from_data_stream(
+            self.env.from_collection([
+                (0,
+                 Vectors.dense(2.1, 3.1)),
+                (1,
+                 Vectors.dense(1.1, 3.3)),
+            ],
+                type_info=Types.ROW_NAMED(
+                    ['id', 'vec'],
+                    [Types.INT(), DenseVectorTypeInfo()])))
+
+        self.expected_output_data_1 = Vectors.dense(2.31, 3.41)
+        self.expected_output_data_2 = Vectors.dense(1.21, 3.63)
+
+    def test_param(self):
+        elementwise_product = ElementwiseProduct()
+
+        self.assertEqual('input', elementwise_product.get_input_col())
+        self.assertEqual('output', elementwise_product.get_output_col())
+
+        elementwise_product.set_input_col('vec') \
+            .set_output_col('output_vec') \
+            .set_scaling_vec(Vectors.dense(1.1, 1.1))
+
+        self.assertEqual('vec', elementwise_product.get_input_col())
+        self.assertEqual(Vectors.dense(1.1, 1.1), elementwise_product.get_scaling_vec())
+        self.assertEqual('output_vec', elementwise_product.get_output_col())
+
+    def test_save_load_transform(self):
+        elementwise_product = ElementwiseProduct() \
+            .set_input_col('vec') \
+            .set_output_col('output_vec') \
+            .set_scaling_vec(Vectors.dense(1.1, 1.1))
+
+        path = os.path.join(self.temp_dir, 'test_save_load_transform_elementwise_product')
+        elementwise_product.save(path)
+        elementwise_product = ElementwiseProduct.load(self.t_env, path)
+
+        output_table = elementwise_product.transform(self.input_data_table)[0]
+        actual_outputs = [(result[0], result[2]) for result in
+                          self.t_env.to_data_stream(output_table).execute_and_collect()]
+
+        self.assertEqual(2, len(actual_outputs))
+        for actual_output in actual_outputs:
+            if actual_output[0] == 0:
+                self.assertAlmostEqual(self.expected_output_data_1.get(0),
+                                       actual_output[1].get(0), delta=1e-7)
+                self.assertAlmostEqual(self.expected_output_data_1.get(1),

Review Comment:
   Can we compare the two vectors or two arrays here, instead of comparing values at each index? If it is difficult, let's at least use a for loop here, and also check for the size of the vectors.



##########
flink-ml-python/pyflink/ml/core/param.py:
##########
@@ -86,6 +87,8 @@ def get_param_map(self) -> Dict['Param[Any]', Any]:
     @staticmethod
     def _is_compatible_type(param: 'Param[V]', value: V) -> bool:
         if value is not None and param.type != type(value):
+            if type(value).__name__ == 'DenseVector' or type(value).__name__ == 'SparseVector':
+                return issubclass(type(value), param.type)

Review Comment:
   Could you please illustrate why we need to add this code here, and why we don't need to add corresponding codes in the Java implementation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #135: [FLINK-28611] Add Transformer for ElementwiseProduct

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #135:
URL: https://github.com/apache/flink-ml/pull/135#discussion_r935361497


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/elementwiseproduct/ElementwiseProduct.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.elementwiseproduct;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.BLAS;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * An transformer that multiplies each input vector with a given scaling vector using Hadamard
+ * product.
+ *
+ * <p>If the size of the input vector does not equal the size of the scaling vector, the transformer
+ * will throw {@link IllegalArgumentException}.
+ */
+public class ElementwiseProduct
+        implements Transformer<ElementwiseProduct>, ElementwiseProductParams<ElementwiseProduct> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public ElementwiseProduct() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), VectorTypeInfo.INSTANCE),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCol()));
+        DataStream<Row> output =
+                tEnv.toDataStream(inputs[0])
+                        .map(
+                                new ElementwiseProductFunction(getInputCol(), getScalingVec()),
+                                outputTypeInfo);
+        Table outputTable = tEnv.fromDataStream(output);
+        return new Table[] {outputTable};
+    }
+
+    private static class ElementwiseProductFunction implements MapFunction<Row, Row> {
+        private final String inputCol;
+        private final Vector scalingVec;
+
+        public ElementwiseProductFunction(String inputCol, Vector scalingVec) {
+            this.inputCol = inputCol;
+            this.scalingVec = scalingVec;
+        }
+
+        @Override
+        public Row map(Row value) {
+            Vector inputVec = value.getFieldAs(inputCol);
+            Vector retVec = (null != inputVec) ? inputVec.clone() : null;

Review Comment:
   How about we check the size of the inputVec and the scaling vec before conducting `hdot`? Then we can throw an illegalArgumentException here and the exception seems more clear to users.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on pull request #135: Support ElementwiseProduct in FlinkML

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on PR #135:
URL: https://github.com/apache/flink-ml/pull/135#issuecomment-1194937026

   Thanks for the PR. Could you please add a JIRA ticket for this? Also for the title of the PR, could you follow the existing convention, i.e., [FLINK-XXX] Add Transformer for ... 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] weibozhao commented on a diff in pull request #135: [FLINK-28611] Add Transformer for ElementwiseProduct

Posted by GitBox <gi...@apache.org>.
weibozhao commented on code in PR #135:
URL: https://github.com/apache/flink-ml/pull/135#discussion_r935302096


##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.elementwiseproduct.ElementwiseProduct;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/** Tests {@link ElementwiseProduct}. */
+public class ElementwiseProductTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1),
+                            Vectors.sparse(5, new int[] {3}, new double[] {1.0})),
+                    Row.of(
+                            1,
+                            Vectors.dense(1.1, 3.3),
+                            Vectors.sparse(
+                                    5, new int[] {4, 2, 3, 1}, new double[] {4.0, 2.0, 3.0, 1.0})),
+                    Row.of(2, null, null));
+
+    private static final double[] EXPECTED_OUTPUT_DENSE_VEC_ARRAY_1 = new double[] {2.31, 3.41};
+    private static final double[] EXPECTED_OUTPUT_DENSE_VEC_ARRAY_2 = new double[] {1.21, 3.63};
+
+    private static final int EXPECTED_OUTPUT_SPARSE_VEC_SIZE_1 = 5;
+    private static final int[] EXPECTED_OUTPUT_SPARSE_VEC_INDICES_1 = new int[] {3};
+    private static final double[] EXPECTED_OUTPUT_SPARSE_VEC_VALUES_1 = new double[] {0.0};
+
+    private static final int EXPECTED_OUTPUT_SPARSE_VEC_SIZE_2 = 5;
+    private static final int[] EXPECTED_OUTPUT_SPARSE_VEC_INDICES_2 = new int[] {1, 2, 3, 4};
+    private static final double[] EXPECTED_OUTPUT_SPARSE_VEC_VALUES_2 =
+            new double[] {1.1, 0.0, 0.0, 0.0};

Review Comment:
    I think there is no need to do this. For doing this is too expensive and may bring us some confusion, we need to do the below things:
   1. Loop all the elements in the sparse vector and judge whether they equal zero(double) or not. If an element is 1.0e-12, It will bring us some confusion.
   2. New two arrays to store new indices and new values. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #135: [FLINK-28611] Add Transformer for ElementwiseProduct

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #135:
URL: https://github.com/apache/flink-ml/pull/135#discussion_r935349000


##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/StageTest.java:
##########
@@ -82,6 +85,9 @@ public class StageTest {
         Param<Integer[]> INT_ARRAY_PARAM =
                 new IntArrayParam("intArrayParam", "Description", new Integer[] {6, 7});
 
+        Param<Vector> VECTOR_PARAM =

Review Comment:
   nit: How about moving `Vector_PARAM` to the end of function, i.e., Line#121, such that we test **all** the ArrayParams and then VectorParams?
   
   Same for other similar cases in Python and Java.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/elementwiseproduct/ElementwiseProduct.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.elementwiseproduct;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.BLAS;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * An transformer that multiplies each input vector with a given scaling vector using Hadamard

Review Comment:
   nit: To be consistent with the existing java docs, how about make it `A Transformer that multiplies...`?



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.elementwiseproduct.ElementwiseProduct;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/** Tests {@link ElementwiseProduct}. */
+public class ElementwiseProductTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1),
+                            Vectors.sparse(5, new int[] {3}, new double[] {1.0})),
+                    Row.of(
+                            1,
+                            Vectors.dense(1.1, 3.3),
+                            Vectors.sparse(
+                                    5, new int[] {4, 2, 3, 1}, new double[] {4.0, 2.0, 3.0, 1.0})),
+                    Row.of(2, null, null));
+
+    private static final double[] EXPECTED_OUTPUT_DENSE_VEC_ARRAY_1 = new double[] {2.31, 3.41};
+    private static final double[] EXPECTED_OUTPUT_DENSE_VEC_ARRAY_2 = new double[] {1.21, 3.63};
+
+    private static final int EXPECTED_OUTPUT_SPARSE_VEC_SIZE_1 = 5;
+    private static final int[] EXPECTED_OUTPUT_SPARSE_VEC_INDICES_1 = new int[] {3};
+    private static final double[] EXPECTED_OUTPUT_SPARSE_VEC_VALUES_1 = new double[] {0.0};
+
+    private static final int EXPECTED_OUTPUT_SPARSE_VEC_SIZE_2 = 5;
+    private static final int[] EXPECTED_OUTPUT_SPARSE_VEC_INDICES_2 = new int[] {1, 2, 3, 4};
+    private static final double[] EXPECTED_OUTPUT_SPARSE_VEC_VALUES_2 =
+            new double[] {1.1, 0.0, 0.0, 0.0};
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(3, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    SparseVector sparseVector = (SparseVector) result.getField(outputCol);
+                    assertEquals(EXPECTED_OUTPUT_SPARSE_VEC_SIZE_1, sparseVector.size());
+                    assertArrayEquals(EXPECTED_OUTPUT_SPARSE_VEC_INDICES_1, sparseVector.indices);
+                    assertArrayEquals(
+                            EXPECTED_OUTPUT_SPARSE_VEC_VALUES_1, sparseVector.values, 1.0e-5);
+                } else {
+                    assertArrayEquals(
+                            EXPECTED_OUTPUT_DENSE_VEC_ARRAY_1,
+                            ((DenseVector) result.getField(outputCol)).values,
+                            1.0e-5);
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    SparseVector sparseVector = (SparseVector) result.getField(outputCol);
+                    assertEquals(EXPECTED_OUTPUT_SPARSE_VEC_SIZE_2, sparseVector.size());
+                    assertArrayEquals(EXPECTED_OUTPUT_SPARSE_VEC_INDICES_2, sparseVector.indices);
+                    assertArrayEquals(
+                            EXPECTED_OUTPUT_SPARSE_VEC_VALUES_2, sparseVector.values, 1.0e-5);
+                } else {
+                    assertArrayEquals(
+                            EXPECTED_OUTPUT_DENSE_VEC_ARRAY_2,
+                            ((DenseVector) result.getField(outputCol)).values,
+                            1.0e-5);
+                }
+            } else if (result.getField(0) == (Object) 2) {
+                assertNull(result.getField(outputCol));
+            } else {
+                throw new UnsupportedOperationException("Input data id not exists.");
+            }
+        }
+    }
+
+    @Test
+    public void testParam() {
+        ElementwiseProduct elementwiseProduct = new ElementwiseProduct();
+        assertEquals("output", elementwiseProduct.getOutputCol());
+        assertEquals("input", elementwiseProduct.getInputCol());
+
+        elementwiseProduct
+                .setInputCol("vec")
+                .setOutputCol("outputVec")
+                .setScalingVec(Vectors.dense(1.0, 2.0, 3.0));
+        assertEquals("vec", elementwiseProduct.getInputCol());
+        assertEquals(Vectors.dense(1.0, 2.0, 3.0), elementwiseProduct.getScalingVec());
+        assertEquals("outputVec", elementwiseProduct.getOutputCol());
+    }
+
+    @Test
+    public void testOutputSchema() {
+        ElementwiseProduct elementwiseProduct =
+                new ElementwiseProduct()
+                        .setInputCol("vec")
+                        .setOutputCol("outputVec")
+                        .setScalingVec(Vectors.dense(1.0, 2.0, 3.0));
+        Table output = elementwiseProduct.transform(inputDataTable)[0];
+        assertEquals(
+                Arrays.asList("id", "vec", "sparseVec", "outputVec"),
+                output.getResolvedSchema().getColumnNames());
+    }
+
+    @Test
+    public void testSaveLoadAndTransformDense() throws Exception {
+        ElementwiseProduct elementwiseProduct =
+                new ElementwiseProduct()
+                        .setInputCol("vec")
+                        .setOutputCol("outputVec")
+                        .setScalingVec(Vectors.dense(1.1, 1.1));
+        ElementwiseProduct loadedElementwiseProduct =
+                TestUtils.saveAndReload(
+                        tEnv, elementwiseProduct, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+        Table output = loadedElementwiseProduct.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedElementwiseProduct.getOutputCol(), false);
+    }
+
+    @Test
+    public void testVectorSizeNotEquals() {
+        try {
+            ElementwiseProduct elementwiseProduct =
+                    new ElementwiseProduct()
+                            .setInputCol("vec")
+                            .setOutputCol("outputVec")
+                            .setScalingVec(Vectors.dense(1.1, 1.1, 2.0));
+            Table output = elementwiseProduct.transform(inputDataTable)[0];
+            DataStream<Row> dataStream = tEnv.toDataStream(output);
+            IteratorUtils.toList(dataStream.executeAndCollect());
+            Assert.fail("Expected IllegalArgumentException");

Review Comment:
   It is not an illegalArgumentException here. It is an IllegalState and the code never runs here.
   
   How about replace this line with `fail()`?



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.elementwiseproduct.ElementwiseProduct;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/** Tests {@link ElementwiseProduct}. */
+public class ElementwiseProductTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            0,
+                            Vectors.dense(2.1, 3.1),
+                            Vectors.sparse(5, new int[] {3}, new double[] {1.0})),
+                    Row.of(
+                            1,
+                            Vectors.dense(1.1, 3.3),
+                            Vectors.sparse(
+                                    5, new int[] {4, 2, 3, 1}, new double[] {4.0, 2.0, 3.0, 1.0})),
+                    Row.of(2, null, null));
+
+    private static final double[] EXPECTED_OUTPUT_DENSE_VEC_ARRAY_1 = new double[] {2.31, 3.41};
+    private static final double[] EXPECTED_OUTPUT_DENSE_VEC_ARRAY_2 = new double[] {1.21, 3.63};
+
+    private static final int EXPECTED_OUTPUT_SPARSE_VEC_SIZE_1 = 5;
+    private static final int[] EXPECTED_OUTPUT_SPARSE_VEC_INDICES_1 = new int[] {3};
+    private static final double[] EXPECTED_OUTPUT_SPARSE_VEC_VALUES_1 = new double[] {0.0};
+
+    private static final int EXPECTED_OUTPUT_SPARSE_VEC_SIZE_2 = 5;
+    private static final int[] EXPECTED_OUTPUT_SPARSE_VEC_INDICES_2 = new int[] {1, 2, 3, 4};
+    private static final double[] EXPECTED_OUTPUT_SPARSE_VEC_VALUES_2 =
+            new double[] {1.1, 0.0, 0.0, 0.0};
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("id", "vec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, boolean isSparse)
+            throws Exception {
+        DataStream<Row> dataStream = tEnv.toDataStream(output);
+        List<Row> results = IteratorUtils.toList(dataStream.executeAndCollect());
+        assertEquals(3, results.size());
+        for (Row result : results) {
+            if (result.getField(0) == (Object) 0) {
+                if (isSparse) {
+                    SparseVector sparseVector = (SparseVector) result.getField(outputCol);
+                    assertEquals(EXPECTED_OUTPUT_SPARSE_VEC_SIZE_1, sparseVector.size());
+                    assertArrayEquals(EXPECTED_OUTPUT_SPARSE_VEC_INDICES_1, sparseVector.indices);
+                    assertArrayEquals(
+                            EXPECTED_OUTPUT_SPARSE_VEC_VALUES_1, sparseVector.values, 1.0e-5);
+                } else {
+                    assertArrayEquals(
+                            EXPECTED_OUTPUT_DENSE_VEC_ARRAY_1,
+                            ((DenseVector) result.getField(outputCol)).values,
+                            1.0e-5);
+                }
+            } else if (result.getField(0) == (Object) 1) {
+                if (isSparse) {
+                    SparseVector sparseVector = (SparseVector) result.getField(outputCol);
+                    assertEquals(EXPECTED_OUTPUT_SPARSE_VEC_SIZE_2, sparseVector.size());
+                    assertArrayEquals(EXPECTED_OUTPUT_SPARSE_VEC_INDICES_2, sparseVector.indices);
+                    assertArrayEquals(
+                            EXPECTED_OUTPUT_SPARSE_VEC_VALUES_2, sparseVector.values, 1.0e-5);
+                } else {
+                    assertArrayEquals(
+                            EXPECTED_OUTPUT_DENSE_VEC_ARRAY_2,
+                            ((DenseVector) result.getField(outputCol)).values,
+                            1.0e-5);
+                }
+            } else if (result.getField(0) == (Object) 2) {
+                assertNull(result.getField(outputCol));
+            } else {
+                throw new UnsupportedOperationException("Input data id not exists.");
+            }
+        }
+    }
+
+    @Test
+    public void testParam() {
+        ElementwiseProduct elementwiseProduct = new ElementwiseProduct();
+        assertEquals("output", elementwiseProduct.getOutputCol());
+        assertEquals("input", elementwiseProduct.getInputCol());
+
+        elementwiseProduct
+                .setInputCol("vec")
+                .setOutputCol("outputVec")
+                .setScalingVec(Vectors.dense(1.0, 2.0, 3.0));
+        assertEquals("vec", elementwiseProduct.getInputCol());
+        assertEquals(Vectors.dense(1.0, 2.0, 3.0), elementwiseProduct.getScalingVec());
+        assertEquals("outputVec", elementwiseProduct.getOutputCol());
+    }
+
+    @Test
+    public void testOutputSchema() {
+        ElementwiseProduct elementwiseProduct =
+                new ElementwiseProduct()
+                        .setInputCol("vec")
+                        .setOutputCol("outputVec")
+                        .setScalingVec(Vectors.dense(1.0, 2.0, 3.0));
+        Table output = elementwiseProduct.transform(inputDataTable)[0];
+        assertEquals(
+                Arrays.asList("id", "vec", "sparseVec", "outputVec"),
+                output.getResolvedSchema().getColumnNames());
+    }
+
+    @Test
+    public void testSaveLoadAndTransformDense() throws Exception {
+        ElementwiseProduct elementwiseProduct =
+                new ElementwiseProduct()
+                        .setInputCol("vec")
+                        .setOutputCol("outputVec")
+                        .setScalingVec(Vectors.dense(1.1, 1.1));
+        ElementwiseProduct loadedElementwiseProduct =
+                TestUtils.saveAndReload(
+                        tEnv, elementwiseProduct, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+        Table output = loadedElementwiseProduct.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedElementwiseProduct.getOutputCol(), false);
+    }
+
+    @Test
+    public void testVectorSizeNotEquals() {
+        try {
+            ElementwiseProduct elementwiseProduct =
+                    new ElementwiseProduct()
+                            .setInputCol("vec")
+                            .setOutputCol("outputVec")
+                            .setScalingVec(Vectors.dense(1.1, 1.1, 2.0));
+            Table output = elementwiseProduct.transform(inputDataTable)[0];
+            DataStream<Row> dataStream = tEnv.toDataStream(output);
+            IteratorUtils.toList(dataStream.executeAndCollect());
+            Assert.fail("Expected IllegalArgumentException");
+        } catch (Exception e) {
+            assertEquals(
+                    "Vector size mismatched.",
+                    e.getCause().getCause().getCause().getCause().getCause().getMessage());

Review Comment:
   How about using `ExceptionUtils.getRootCause(e).getMessage())`?



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/elementwiseproduct/ElementwiseProduct.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.elementwiseproduct;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.BLAS;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * An transformer that multiplies each input vector with a given scaling vector using Hadamard
+ * product.
+ *
+ * <p>If the size of the input vector does not equal the size of the scaling vector, the transformer
+ * will throw {@link IllegalArgumentException}.
+ */
+public class ElementwiseProduct
+        implements Transformer<ElementwiseProduct>, ElementwiseProductParams<ElementwiseProduct> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public ElementwiseProduct() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), VectorTypeInfo.INSTANCE),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCol()));
+        DataStream<Row> output =
+                tEnv.toDataStream(inputs[0])
+                        .map(
+                                new ElementwiseProductFunction(getInputCol(), getScalingVec()),
+                                outputTypeInfo);
+        Table outputTable = tEnv.fromDataStream(output);
+        return new Table[] {outputTable};
+    }
+
+    private static class ElementwiseProductFunction implements MapFunction<Row, Row> {
+        private final String inputCol;
+        private final Vector scalingVec;
+
+        public ElementwiseProductFunction(String inputCol, Vector scalingVec) {
+            this.inputCol = inputCol;
+            this.scalingVec = scalingVec;
+        }
+
+        @Override
+        public Row map(Row value) {
+            Vector inputVec = value.getFieldAs(inputCol);
+            Vector retVec = (null != inputVec) ? inputVec.clone() : null;

Review Comment:
   How about we check the size of the inputVec and the scaling vec before conducting `hdot`? Then we can throw an illegalArgumentException here and the exception seems more clear to me.



##########
flink-ml-python/pyflink/ml/lib/feature/elementwiseproduct.py:
##########
@@ -0,0 +1,73 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+from pyflink.ml.core.param import ParamValidators, Param, VectorParam
+from pyflink.ml.core.wrapper import JavaWithParams
+from pyflink.ml.lib.feature.common import JavaFeatureTransformer
+from pyflink.ml.lib.param import HasInputCol, HasOutputCol
+from pyflink.ml.core.linalg import Vector
+
+
+class _ElementwiseProductParams(
+    JavaWithParams,
+    HasInputCol,
+    HasOutputCol
+):
+    """
+    Params for :class:`ElementwiseProduct`.
+    """
+
+    SCALING_VEC: Param[Vector] = VectorParam(
+        "scaling_vec",
+        "the scaling vector to multiply with input vectors using hadamard product.",
+        None,
+        ParamValidators.not_null())
+
+    def __init__(self, java_params):
+        super(_ElementwiseProductParams, self).__init__(java_params)
+
+    def set_scaling_vec(self, value: Vector):
+        return self.set(self.SCALING_VEC, value)
+
+    def get_scaling_vec(self) -> Vector:
+        return self.get(self.SCALING_VEC)
+
+    @property
+    def scaling_vec(self) -> Vector:
+        return self.get_scaling_vec()
+
+
+class ElementwiseProduct(JavaFeatureTransformer, _ElementwiseProductParams):
+    """
+    ElementwiseProduct is a transformer that multiplies each input vector with a

Review Comment:
   nit: make the python doc consistent with java docs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 merged pull request #135: [FLINK-28611] Add Transformer for ElementwiseProduct

Posted by GitBox <gi...@apache.org>.
zhipeng93 merged PR #135:
URL: https://github.com/apache/flink-ml/pull/135


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org