You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/22 10:59:24 UTC

[GitHub] [flink-ml] zhipeng93 opened a new pull request, #114: [FLINK-27096] Optimize VectorAssembler performance

zhipeng93 opened a new pull request, #114:
URL: https://github.com/apache/flink-ml/pull/114

   This PR optimizes the performance of VectorAssembler. It increases the throughput by 30%-50%.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #114: [FLINK-27096] Optimize VectorAssembler performance

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #114:
URL: https://github.com/apache/flink-ml/pull/114#discussion_r904510367


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java:
##########
@@ -80,24 +82,68 @@ public Table[] transform(Table... inputs) {
         return new Table[] {outputTable};
     }
 
-    private static class AssemblerFunc implements FlatMapFunction<Row, Row> {
+    private static class AssemblerFunc extends RichFlatMapFunction<Row, Row> {
         private final String[] inputCols;
         private final String handleInvalid;
 
+        /** The indices for assembling vectors. */
+        private transient IntArrayList indices;
+        /** The values for assembling vectors. */
+        private transient DoubleArrayList values;

Review Comment:
   Marked resolved since I removed the use of `it.unimi.dsi.fastutil.*`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #114: [FLINK-27096] Optimize VectorAssembler performance

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #114:
URL: https://github.com/apache/flink-ml/pull/114#discussion_r904423389


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java:
##########
@@ -80,24 +82,68 @@ public Table[] transform(Table... inputs) {
         return new Table[] {outputTable};
     }
 
-    private static class AssemblerFunc implements FlatMapFunction<Row, Row> {
+    private static class AssemblerFunc extends RichFlatMapFunction<Row, Row> {
         private final String[] inputCols;
         private final String handleInvalid;
 
+        /** The indices for assembling vectors. */
+        private transient IntArrayList indices;

Review Comment:
   Why should we use `IntArrayList`, instead of `List<Integer>` and `Integer[]`? Would it bring performance improvement compared with these options?
   
   How about implementing it in two loops? In the first loop, we only extract the vector/numbers, and calculate their total size. Then we allocate the integer and double arrays according to the calculated size, and assign the values in the second loop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on pull request #114: [FLINK-27096] Optimize VectorAssembler performance

Posted by GitBox <gi...@apache.org>.
lindong28 commented on PR #114:
URL: https://github.com/apache/flink-ml/pull/114#issuecomment-1164011174

   Thanks for the update. LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 merged pull request #114: [FLINK-27096] Optimize VectorAssembler performance

Posted by GitBox <gi...@apache.org>.
lindong28 merged PR #114:
URL: https://github.com/apache/flink-ml/pull/114


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #114: [FLINK-27096] Optimize VectorAssembler performance

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #114:
URL: https://github.com/apache/flink-ml/pull/114#discussion_r904509535


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java:
##########
@@ -80,24 +82,68 @@ public Table[] transform(Table... inputs) {
         return new Table[] {outputTable};
     }
 
-    private static class AssemblerFunc implements FlatMapFunction<Row, Row> {
+    private static class AssemblerFunc extends RichFlatMapFunction<Row, Row> {
         private final String[] inputCols;
         private final String handleInvalid;
 
+        /** The indices for assembling vectors. */
+        private transient IntArrayList indices;

Review Comment:
   Thanks for the comment. I have removed the usage of `IntArrayList` and re-implemented in two loops. There is no performance regression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #114: [FLINK-27096] Optimize VectorAssembler performance

Posted by GitBox <gi...@apache.org>.
lindong28 commented on code in PR #114:
URL: https://github.com/apache/flink-ml/pull/114#discussion_r904462286


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java:
##########
@@ -80,24 +82,68 @@ public Table[] transform(Table... inputs) {
         return new Table[] {outputTable};
     }
 
-    private static class AssemblerFunc implements FlatMapFunction<Row, Row> {
+    private static class AssemblerFunc extends RichFlatMapFunction<Row, Row> {
         private final String[] inputCols;
         private final String handleInvalid;
 
+        /** The indices for assembling vectors. */
+        private transient IntArrayList indices;
+        /** The values for assembling vectors. */
+        private transient DoubleArrayList values;
+
         public AssemblerFunc(String[] inputCols, String handleInvalid) {
             this.inputCols = inputCols;
             this.handleInvalid = handleInvalid;
         }
 
         @Override
-        public void flatMap(Row value, Collector<Row> out) throws Exception {
+        public void open(Configuration parameters) throws Exception {
+            super.open(parameters);
+            indices = new IntArrayList();
+            values = new DoubleArrayList();
+        }
+
+        @Override
+        public void flatMap(Row value, Collector<Row> out) {
+            int offset = 0;
             try {
-                Object[] objects = new Object[inputCols.length];
-                for (int i = 0; i < objects.length; ++i) {
-                    objects[i] = value.getField(inputCols[i]);
+                for (String inputCol : inputCols) {
+                    Object object = value.getField(inputCol);
+                    Preconditions.checkNotNull(object, "Input column value should not be null.");
+                    if (object instanceof Number) {
+                        indices.add(offset++);
+                        values.add(((Number) object).doubleValue());
+                    } else if (object instanceof SparseVector) {
+                        SparseVector sparseVector = (SparseVector) object;
+                        for (int i = 0; i < sparseVector.indices.length; ++i) {
+                            indices.add(sparseVector.indices[i] + offset);
+                            values.add(sparseVector.values[i]);
+                        }
+                        offset += sparseVector.size();
+                    } else if (object instanceof DenseVector) {
+                        DenseVector denseVector = (DenseVector) object;
+                        for (int i = 0; i < denseVector.size(); ++i) {
+                            indices.add(offset + i);
+                            values.add(denseVector.values[i]);
+                        }
+                        offset += denseVector.size();
+                    } else {
+                        throw new IllegalArgumentException(
+                                "Input type has not been supported yet.");
+                    }
+                }
+
+                Vector assembledVec =
+                        new SparseVector(

Review Comment:
   If the performance of using `it.unimi.dsi.fastutil.*` is not considerably faster than using a for loop to construct int[] and double[], it seems simpler to still use a for-loop instead of introducing an extra library dependency.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java:
##########
@@ -80,24 +82,68 @@ public Table[] transform(Table... inputs) {
         return new Table[] {outputTable};
     }
 
-    private static class AssemblerFunc implements FlatMapFunction<Row, Row> {
+    private static class AssemblerFunc extends RichFlatMapFunction<Row, Row> {
         private final String[] inputCols;
         private final String handleInvalid;
 
+        /** The indices for assembling vectors. */
+        private transient IntArrayList indices;
+        /** The values for assembling vectors. */
+        private transient DoubleArrayList values;

Review Comment:
   It seems simpler to instantiate indices and values as local variables in the `flatMap()` method. Does re-creating these two variables for each call in the flapMap() have non-trivial performance impact?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #114: [FLINK-27096] Optimize VectorAssembler performance

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on code in PR #114:
URL: https://github.com/apache/flink-ml/pull/114#discussion_r904510188


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/vectorassembler/VectorAssembler.java:
##########
@@ -80,24 +82,68 @@ public Table[] transform(Table... inputs) {
         return new Table[] {outputTable};
     }
 
-    private static class AssemblerFunc implements FlatMapFunction<Row, Row> {
+    private static class AssemblerFunc extends RichFlatMapFunction<Row, Row> {
         private final String[] inputCols;
         private final String handleInvalid;
 
+        /** The indices for assembling vectors. */
+        private transient IntArrayList indices;
+        /** The values for assembling vectors. */
+        private transient DoubleArrayList values;
+
         public AssemblerFunc(String[] inputCols, String handleInvalid) {
             this.inputCols = inputCols;
             this.handleInvalid = handleInvalid;
         }
 
         @Override
-        public void flatMap(Row value, Collector<Row> out) throws Exception {
+        public void open(Configuration parameters) throws Exception {
+            super.open(parameters);
+            indices = new IntArrayList();
+            values = new DoubleArrayList();
+        }
+
+        @Override
+        public void flatMap(Row value, Collector<Row> out) {
+            int offset = 0;
             try {
-                Object[] objects = new Object[inputCols.length];
-                for (int i = 0; i < objects.length; ++i) {
-                    objects[i] = value.getField(inputCols[i]);
+                for (String inputCol : inputCols) {
+                    Object object = value.getField(inputCol);
+                    Preconditions.checkNotNull(object, "Input column value should not be null.");
+                    if (object instanceof Number) {
+                        indices.add(offset++);
+                        values.add(((Number) object).doubleValue());
+                    } else if (object instanceof SparseVector) {
+                        SparseVector sparseVector = (SparseVector) object;
+                        for (int i = 0; i < sparseVector.indices.length; ++i) {
+                            indices.add(sparseVector.indices[i] + offset);
+                            values.add(sparseVector.values[i]);
+                        }
+                        offset += sparseVector.size();
+                    } else if (object instanceof DenseVector) {
+                        DenseVector denseVector = (DenseVector) object;
+                        for (int i = 0; i < denseVector.size(); ++i) {
+                            indices.add(offset + i);
+                            values.add(denseVector.values[i]);
+                        }
+                        offset += denseVector.size();
+                    } else {
+                        throw new IllegalArgumentException(
+                                "Input type has not been supported yet.");
+                    }
+                }
+
+                Vector assembledVec =
+                        new SparseVector(

Review Comment:
   Marked resolved since I removed the use of `it.unimi.dsi.fastutil.*`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on pull request #114: [FLINK-27096] Optimize VectorAssembler performance

Posted by GitBox <gi...@apache.org>.
zhipeng93 commented on PR #114:
URL: https://github.com/apache/flink-ml/pull/114#issuecomment-1163919452

   @lindong28 @yunfengzhou-hub Thanks for the comments.  I have addressed the comments in the latest push


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org