You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/04/20 06:39:37 UTC

[GitHub] [flink-ml] yunfengzhou-hub opened a new pull request, #87: [FLINK-27096] Improve Benchmark Framework

yunfengzhou-hub opened a new pull request, #87:
URL: https://github.com/apache/flink-ml/pull/87

   - Add python scripts for batch generating & analyzing benchmark
   - Enrich benchmark result file's content with input parameters
   - Support saving benchmark results every time a benchmark is completed


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #87: [FLINK-27096] Add a script to visualize benchmark results

Posted by GitBox <gi...@apache.org>.
lindong28 commented on code in PR #87:
URL: https://github.com/apache/flink-ml/pull/87#discussion_r866599491


##########
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java:
##########
@@ -31,34 +31,33 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
+import java.util.Collections;
 import java.util.Map;
 
 /** Entry class for benchmark execution. */
 public class Benchmark {
-    private static final Logger LOG = LoggerFactory.getLogger(Benchmark.class);
-
     static final String VERSION_KEY = "version";
 
-    static final Option HELP_OPTION =
+    private static final Logger LOG = LoggerFactory.getLogger(Benchmark.class);
+
+    private static final Option HELP_OPTION =
             Option.builder("h")
                     .longOpt("help")
                     .desc("Show the help message for the command line interface.")
                     .build();
 
-    static final Option OUTPUT_FILE_OPTION =
+    private static final Option OUTPUT_FILE_OPTION =
             Option.builder()
                     .longOpt("output-file")
                     .desc("The output file name to save benchmark results.")
                     .hasArg()
                     .build();
 
-    static final Options OPTIONS =
+    private static final Options OPTIONS =
             new Options().addOption(HELP_OPTION).addOption(OUTPUT_FILE_OPTION);
 
-    public static void printHelp() {
+    private static void printHelp() {
         HelpFormatter formatter = new HelpFormatter();
         formatter.setLeftPadding(5);
         formatter.setWidth(80);

Review Comment:
   Should we update the line below to use the new script name?
   
   And we can simplify the code a bit by using `\n` in `"./benchmark-run.sh <config-file-path> [OPTIONS]\n"` instead of using `System.out.println()`.



##########
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java:
##########
@@ -71,46 +70,59 @@ public static void printHelp() {
         System.out.println();
     }
 
-    @SuppressWarnings("unchecked")
-    public static void executeBenchmarks(CommandLine commandLine) throws Exception {
+    private static void executeBenchmarks(CommandLine commandLine) throws Exception {
         String configFile = commandLine.getArgs()[0];
-        Map<String, ?> benchmarks = BenchmarkUtils.parseJsonFile(configFile);
-        System.out.println("Found benchmarks " + benchmarks.keySet());
+        Map<String, Map<String, Map<String, ?>>> benchmarks =
+                BenchmarkUtils.parseJsonFile(configFile);
+        System.out.println("Found " + benchmarks.keySet().size() + " benchmarks.");
+        String saveFile = commandLine.getOptionValue(OUTPUT_FILE_OPTION.getLongOpt());
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
-        List<BenchmarkResult> results = new ArrayList<>();
-        for (Map.Entry<String, ?> benchmark : benchmarks.entrySet()) {
-            LOG.info("Running benchmark " + benchmark.getKey() + ".");
-
-            BenchmarkResult result =
-                    BenchmarkUtils.runBenchmark(
-                            tEnv, benchmark.getKey(), (Map<String, ?>) benchmark.getValue());
-
-            results.add(result);
-            LOG.info(BenchmarkUtils.getResultsMapAsJson(result));
+        int index = 0;
+        for (Map.Entry<String, Map<String, Map<String, ?>>> entry : benchmarks.entrySet()) {
+            String benchmarkName = entry.getKey();
+            Map<String, Map<String, ?>> benchmarkMap = entry.getValue();
+
+            LOG.info(
+                    String.format(
+                            "Running benchmark %d/%d: %s",
+                            index++, benchmarks.keySet().size(), benchmarkName));
+
+            try {
+                BenchmarkResult result =
+                        BenchmarkUtils.runBenchmark(tEnv, benchmarkName, benchmarkMap);
+                benchmarkMap.put("results", result.toMap());
+                LOG.info(String.format("Benchmark %s finished.\n%s", benchmarkName, benchmarkMap));
+            } catch (Exception e) {
+                benchmarkMap.put(
+                        "results",
+                        Collections.singletonMap(
+                                "exception",
+                                String.format(
+                                        "%s(%s:%s)",
+                                        e,
+                                        e.getStackTrace()[0].getFileName(),
+                                        e.getStackTrace()[0].getLineNumber())));
+                LOG.info(String.format("Benchmark %s failed.\n%s", benchmarkName, e));

Review Comment:
   Should this be `LOG.error(...)`?



##########
flink-ml-dist/src/main/flink-ml-bin/bin/benchmark-results-visualize.py:
##########
@@ -0,0 +1,68 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import argparse
+import json
+import matplotlib.pyplot as plt
+import re
+
+
+def get_nested_field_value(nested_fields, field_names):
+    for field_name in field_names:
+        nested_fields = nested_fields[field_name]
+    return nested_fields
+
+
+def benchmark_results_visualize(file_name, name_pattern, x_field, y_field):

Review Comment:
   Would it be a bit more intuitive to name this method as `visualize_benchmark_results(...)`?



##########
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java:
##########
@@ -71,46 +70,59 @@ public static void printHelp() {
         System.out.println();
     }
 
-    @SuppressWarnings("unchecked")
-    public static void executeBenchmarks(CommandLine commandLine) throws Exception {
+    private static void executeBenchmarks(CommandLine commandLine) throws Exception {
         String configFile = commandLine.getArgs()[0];
-        Map<String, ?> benchmarks = BenchmarkUtils.parseJsonFile(configFile);
-        System.out.println("Found benchmarks " + benchmarks.keySet());
+        Map<String, Map<String, Map<String, ?>>> benchmarks =
+                BenchmarkUtils.parseJsonFile(configFile);
+        System.out.println("Found " + benchmarks.keySet().size() + " benchmarks.");
+        String saveFile = commandLine.getOptionValue(OUTPUT_FILE_OPTION.getLongOpt());
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
-        List<BenchmarkResult> results = new ArrayList<>();
-        for (Map.Entry<String, ?> benchmark : benchmarks.entrySet()) {
-            LOG.info("Running benchmark " + benchmark.getKey() + ".");
-
-            BenchmarkResult result =
-                    BenchmarkUtils.runBenchmark(
-                            tEnv, benchmark.getKey(), (Map<String, ?>) benchmark.getValue());
-
-            results.add(result);
-            LOG.info(BenchmarkUtils.getResultsMapAsJson(result));
+        int index = 0;
+        for (Map.Entry<String, Map<String, Map<String, ?>>> entry : benchmarks.entrySet()) {
+            String benchmarkName = entry.getKey();
+            Map<String, Map<String, ?>> benchmarkMap = entry.getValue();
+
+            LOG.info(
+                    String.format(
+                            "Running benchmark %d/%d: %s",
+                            index++, benchmarks.keySet().size(), benchmarkName));
+
+            try {
+                BenchmarkResult result =
+                        BenchmarkUtils.runBenchmark(tEnv, benchmarkName, benchmarkMap);
+                benchmarkMap.put("results", result.toMap());
+                LOG.info(String.format("Benchmark %s finished.\n%s", benchmarkName, benchmarkMap));
+            } catch (Exception e) {
+                benchmarkMap.put(
+                        "results",
+                        Collections.singletonMap(
+                                "exception",
+                                String.format(
+                                        "%s(%s:%s)",
+                                        e,
+                                        e.getStackTrace()[0].getFileName(),
+                                        e.getStackTrace()[0].getLineNumber())));
+                LOG.info(String.format("Benchmark %s failed.\n%s", benchmarkName, e));
+            }
         }
-
-        String benchmarkResultsJson =
-                BenchmarkUtils.getResultsMapAsJson(results.toArray(new BenchmarkResult[0]));
         System.out.println("Benchmarks execution completed.");
-        System.out.println("Benchmark results summary:");
-        System.out.println(benchmarkResultsJson);
 
+        String benchmarkResultsJson =
+                ReadWriteUtils.OBJECT_MAPPER
+                        .writerWithDefaultPrettyPrinter()
+                        .writeValueAsString(benchmarks);
         if (commandLine.hasOption(OUTPUT_FILE_OPTION.getLongOpt())) {
-            String saveFile = commandLine.getOptionValue(OUTPUT_FILE_OPTION.getLongOpt());
             ReadWriteUtils.saveToFile(saveFile, benchmarkResultsJson, true);
             System.out.println("Benchmark results saved as json in " + saveFile + ".");
+        } else {
+            System.out.println("Benchmark results summary:");

Review Comment:
   Would it be simpler and more consistent with the `LOG.info(...)` above by using the following code:
   
   `System.out.println(String.format("Benchmark results summary:\n%s", benchmarkResultsJson))`



##########
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java:
##########
@@ -120,7 +132,10 @@ public static void main(String[] args) throws Exception {
         } else if (commandLine.getArgs().length == 1) {
             executeBenchmarks(commandLine);
         } else {
-            printInvalidError(args);
+            System.out.println("Invalid command line arguments " + Arrays.toString(args));

Review Comment:
   Would it be simpler and more consistent with the LOG.info(...) above by using the following code:
   
   `System.out.println(String.format("Invalid command line arguments %s \n", Arrays.toString(args)))`



##########
flink-ml-benchmark/README.md:
##########
@@ -63,59 +63,89 @@ Then in Flink ML's binary distribution's folder, execute the following command
 to run the default benchmarks.
 
 ```bash
-./bin/flink-ml-benchmark.sh ./conf/benchmark-conf.json --output-file results.json
+./bin/benchmark-run.sh ./conf/benchmark-conf.json --output-file results.json

Review Comment:
   Would it be simpler and more readable to use the following command
   
   ```
   ./bin/benchmark-run.sh conf/benchmark-conf.json --output-file results.json
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #87: [FLINK-27096] Add a script to visualize benchmark results

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #87:
URL: https://github.com/apache/flink-ml/pull/87#discussion_r857087337


##########
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkResult.java:
##########
@@ -18,13 +18,25 @@
 
 package org.apache.flink.ml.benchmark;
 
+import org.apache.flink.ml.api.Stage;
+import org.apache.flink.ml.benchmark.datagenerator.DataGenerator;
+import org.apache.flink.ml.benchmark.datagenerator.InputDataGenerator;
 import org.apache.flink.util.Preconditions;
 
 /** The result of executing a benchmark. */
 public class BenchmarkResult {
     /** The benchmark name. */
     public final String name;
 
+    /** The stage to execute benchmark on. */
+    public final Stage<?> stage;

Review Comment:
   I added a `toMap` function to `BenchmarkResult` and a `ExceptionToMap` static method to `BenchmarkUtils`. Please see if this solution is good enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #87: [FLINK-27096] Add a script to visualize benchmark results

Posted by GitBox <gi...@apache.org>.
lindong28 commented on code in PR #87:
URL: https://github.com/apache/flink-ml/pull/87#discussion_r861656690


##########
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkResult.java:
##########
@@ -18,13 +18,25 @@
 
 package org.apache.flink.ml.benchmark;
 
+import org.apache.flink.ml.api.Stage;
+import org.apache.flink.ml.benchmark.datagenerator.DataGenerator;
+import org.apache.flink.ml.benchmark.datagenerator.InputDataGenerator;
 import org.apache.flink.util.Preconditions;
 
 /** The result of executing a benchmark. */
 public class BenchmarkResult {
     /** The benchmark name. */
     public final String name;
 
+    /** The stage to execute benchmark on. */
+    public final Stage<?> stage;

Review Comment:
   Thanks for the update. The solution looks much better now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #87: [FLINK-27096] Add a script to visualize benchmark results

Posted by GitBox <gi...@apache.org>.
lindong28 commented on code in PR #87:
URL: https://github.com/apache/flink-ml/pull/87#discussion_r861666211


##########
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java:
##########
@@ -71,37 +70,57 @@ public static void printHelp() {
         System.out.println();
     }
 
-    @SuppressWarnings("unchecked")
     public static void executeBenchmarks(CommandLine commandLine) throws Exception {
         String configFile = commandLine.getArgs()[0];
-        Map<String, ?> benchmarks = BenchmarkUtils.parseJsonFile(configFile);
-        System.out.println("Found benchmarks " + benchmarks.keySet());
+        Map<String, Map<String, Map<String, ?>>> benchmarks =
+                BenchmarkUtils.parseJsonFile(configFile);
+        System.out.println("Found " + benchmarks.keySet().size() + " benchmarks.");
+        String saveFile = commandLine.getOptionValue(OUTPUT_FILE_OPTION.getLongOpt());
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
-        List<BenchmarkResult> results = new ArrayList<>();
-        for (Map.Entry<String, ?> benchmark : benchmarks.entrySet()) {
-            LOG.info("Running benchmark " + benchmark.getKey() + ".");
-
-            BenchmarkResult result =
-                    BenchmarkUtils.runBenchmark(
-                            tEnv, benchmark.getKey(), (Map<String, ?>) benchmark.getValue());
-
-            results.add(result);
-            LOG.info(BenchmarkUtils.getResultsMapAsJson(result));
+        Map<String, Map<String, Map<String, ?>>> results = new HashMap<>();
+        String benchmarkResultsJson = "{}";
+        int index = 0;
+        for (String benchmarkName : benchmarks.keySet()) {

Review Comment:
   Since we need to get both the key and the value of this entry, would it be simpler to use `for (Map.Entry<String, Map<String, Map<String, ?>>> entry : benchmarks.entrySet())` here? 
   
   Then we can use `entry.getKey()` and `entry.getValue()`, instead of repeatedly calling `results.get(benchmarkName)` and `benchmarks.get(benchmarkName)`, in the loop body.
   



##########
flink-ml-dist/src/main/flink-ml-bin/bin/benchmark-results-visualize.py:
##########
@@ -0,0 +1,68 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import argparse
+import json
+import matplotlib.pyplot as plt
+import re
+
+
+def benchmark_results_visualize(filename, name_pattern, x_field, y_field):
+    def get_field(json, fieldnames):

Review Comment:
   Would it be simpler to move this method outside `benchmark_results_visualize`, to reduce the number of nesting?



##########
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java:
##########
@@ -71,37 +70,57 @@ public static void printHelp() {
         System.out.println();
     }
 
-    @SuppressWarnings("unchecked")
     public static void executeBenchmarks(CommandLine commandLine) throws Exception {
         String configFile = commandLine.getArgs()[0];
-        Map<String, ?> benchmarks = BenchmarkUtils.parseJsonFile(configFile);
-        System.out.println("Found benchmarks " + benchmarks.keySet());
+        Map<String, Map<String, Map<String, ?>>> benchmarks =
+                BenchmarkUtils.parseJsonFile(configFile);
+        System.out.println("Found " + benchmarks.keySet().size() + " benchmarks.");
+        String saveFile = commandLine.getOptionValue(OUTPUT_FILE_OPTION.getLongOpt());
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
-        List<BenchmarkResult> results = new ArrayList<>();
-        for (Map.Entry<String, ?> benchmark : benchmarks.entrySet()) {
-            LOG.info("Running benchmark " + benchmark.getKey() + ".");
-
-            BenchmarkResult result =
-                    BenchmarkUtils.runBenchmark(
-                            tEnv, benchmark.getKey(), (Map<String, ?>) benchmark.getValue());
-
-            results.add(result);
-            LOG.info(BenchmarkUtils.getResultsMapAsJson(result));
+        Map<String, Map<String, Map<String, ?>>> results = new HashMap<>();
+        String benchmarkResultsJson = "{}";
+        int index = 0;
+        for (String benchmarkName : benchmarks.keySet()) {
+            LOG.info(
+                    String.format(
+                            "Running benchmark %d/%d: %s",
+                            index++, benchmarks.keySet().size(), benchmarkName));
+
+            results.put(benchmarkName, benchmarks.get(benchmarkName));
+            try {
+                BenchmarkResult result =
+                        BenchmarkUtils.runBenchmark(
+                                tEnv, benchmarkName, benchmarks.get(benchmarkName));
+                results.get(benchmarkName).put("results", result.toMap());
+                LOG.info(
+                        String.format(
+                                "Benchmark %s finished.\n%s",
+                                benchmarkName, results.get(benchmarkName)));
+            } catch (Exception e) {
+                results.get(benchmarkName).put("exception", BenchmarkUtils.exceptionToMap(e));
+                LOG.info(String.format("Benchmark %s failed.\n%s", benchmarkName, e));
+            }
+
+            benchmarkResultsJson =
+                    ReadWriteUtils.OBJECT_MAPPER
+                            .writerWithDefaultPrettyPrinter()
+                            .writeValueAsString(results);
+
+            if (commandLine.hasOption(OUTPUT_FILE_OPTION.getLongOpt())) {
+                ReadWriteUtils.saveToFile(saveFile, benchmarkResultsJson, true);
+                LOG.info("Benchmark results saved as json in " + saveFile + ".");
+            }
         }
-
-        String benchmarkResultsJson =
-                BenchmarkUtils.getResultsMapAsJson(results.toArray(new BenchmarkResult[0]));
         System.out.println("Benchmarks execution completed.");
-        System.out.println("Benchmark results summary:");
-        System.out.println(benchmarkResultsJson);
 
         if (commandLine.hasOption(OUTPUT_FILE_OPTION.getLongOpt())) {
-            String saveFile = commandLine.getOptionValue(OUTPUT_FILE_OPTION.getLongOpt());
-            ReadWriteUtils.saveToFile(saveFile, benchmarkResultsJson, true);
             System.out.println("Benchmark results saved as json in " + saveFile + ".");
+        } else {
+            System.out.println("Benchmark results summary:");
+            System.out.println(benchmarkResultsJson);

Review Comment:
   It seems that `printInvalidError()` is only called once in this class. Since its logic is pretty straightforward, would it be simpler to remove this method?
   
   If we choose to keep this method, could we mark it `private`?



##########
flink-ml-dist/src/main/flink-ml-bin/bin/benchmark-results-visualize.py:
##########
@@ -0,0 +1,68 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import argparse
+import json
+import matplotlib.pyplot as plt
+import re
+
+
+def benchmark_results_visualize(filename, name_pattern, x_field, y_field):
+    def get_field(json, fieldnames):
+        field = json

Review Comment:
   Would it be simpler to just rename the first function input parameter as `field`?



##########
flink-ml-core/src/main/java/org/apache/flink/ml/util/ReadWriteUtils.java:
##########
@@ -71,9 +71,9 @@ private static <T> Object jsonEncodeHelper(Param<T> param, Object value) throws
         return param.jsonEncode((T) value);
     }
 
-    // Converts Map<Param<?>, Object> to Map<String, String> which maps the parameter name to the
-    // string-encoded parameter value.
-    private static Map<String, Object> jsonEncode(Map<Param<?>, Object> paramMap)
+    // Converts Map<Param<?>, Object> to Map<String, Object> which maps the parameter name to the
+    // json-supported parameter value.
+    public static Map<String, Object> jsonEncode(Map<Param<?>, Object> paramMap)

Review Comment:
   Does this need to be `public`?



##########
flink-ml-dist/src/main/flink-ml-bin/bin/benchmark-results-visualize.py:
##########
@@ -0,0 +1,68 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import argparse
+import json
+import matplotlib.pyplot as plt
+import re
+
+
+def benchmark_results_visualize(filename, name_pattern, x_field, y_field):
+    def get_field(json, fieldnames):
+        field = json
+        for fieldname in fieldnames.split("."):
+            field = field[fieldname]
+        return field
+
+    x_values = []
+    y_values = []
+    with open(filename, "r") as configfile:
+        for name, config in json.loads(configfile.read()).items():

Review Comment:
   Following the python naming convention, should we change `configfile` to `config_file`?
   
   Same for `fieldnames` etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #87: [FLINK-27096] Add a script to visualize benchmark results

Posted by GitBox <gi...@apache.org>.
lindong28 commented on code in PR #87:
URL: https://github.com/apache/flink-ml/pull/87#discussion_r861680131


##########
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java:
##########
@@ -75,33 +75,43 @@ public static void printHelp() {
     public static void executeBenchmarks(CommandLine commandLine) throws Exception {
         String configFile = commandLine.getArgs()[0];
         Map<String, ?> benchmarks = BenchmarkUtils.parseJsonFile(configFile);
-        System.out.println("Found benchmarks " + benchmarks.keySet());
+        System.out.println("Found " + benchmarks.keySet().size() + " benchmarks.");
+        String saveFile = commandLine.getOptionValue(OUTPUT_FILE_OPTION.getLongOpt());
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
         List<BenchmarkResult> results = new ArrayList<>();
+        String benchmarkResultsJson = "{}";
+        int index = 0;
         for (Map.Entry<String, ?> benchmark : benchmarks.entrySet()) {
-            LOG.info("Running benchmark " + benchmark.getKey() + ".");
+            LOG.info(
+                    String.format(
+                            "Running benchmark %d/%d: %s",
+                            index++, benchmarks.keySet().size(), benchmark.getKey()));
 
             BenchmarkResult result =
                     BenchmarkUtils.runBenchmark(
                             tEnv, benchmark.getKey(), (Map<String, ?>) benchmark.getValue());
 
             results.add(result);
-            LOG.info(BenchmarkUtils.getResultsMapAsJson(result));
-        }
+            LOG.info("\n" + BenchmarkUtils.getResultsMapAsJson(result));
+
+            benchmarkResultsJson =
+                    BenchmarkUtils.getResultsMapAsJson(results.toArray(new BenchmarkResult[0]));
 
-        String benchmarkResultsJson =
-                BenchmarkUtils.getResultsMapAsJson(results.toArray(new BenchmarkResult[0]));
+            if (commandLine.hasOption(OUTPUT_FILE_OPTION.getLongOpt())) {
+                ReadWriteUtils.saveToFile(saveFile, benchmarkResultsJson, true);

Review Comment:
   Hmm.. I think we can still save all benchmark results outside the loop, when one benchmark fails, by catching the exception from the benchmark failure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on pull request #87: [FLINK-27096] Add a script to visualize benchmark results

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on PR #87:
URL: https://github.com/apache/flink-ml/pull/87#issuecomment-1107788716

   Thanks for the comments. I have updated the PR according to the comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 closed pull request #87: [FLINK-27096] Add a script to visualize benchmark results

Posted by GitBox <gi...@apache.org>.
lindong28 closed pull request #87: [FLINK-27096] Add a script to visualize benchmark results
URL: https://github.com/apache/flink-ml/pull/87


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on pull request #87: [FLINK-27096] Add a script to visualize benchmark results

Posted by GitBox <gi...@apache.org>.
lindong28 commented on PR #87:
URL: https://github.com/apache/flink-ml/pull/87#issuecomment-1119477727

   Thanks for the update! LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #87: [FLINK-27096] Add a script to visualize benchmark results

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on code in PR #87:
URL: https://github.com/apache/flink-ml/pull/87#discussion_r857087174


##########
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java:
##########
@@ -75,33 +75,43 @@ public static void printHelp() {
     public static void executeBenchmarks(CommandLine commandLine) throws Exception {
         String configFile = commandLine.getArgs()[0];
         Map<String, ?> benchmarks = BenchmarkUtils.parseJsonFile(configFile);
-        System.out.println("Found benchmarks " + benchmarks.keySet());
+        System.out.println("Found " + benchmarks.keySet().size() + " benchmarks.");
+        String saveFile = commandLine.getOptionValue(OUTPUT_FILE_OPTION.getLongOpt());
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
         List<BenchmarkResult> results = new ArrayList<>();
+        String benchmarkResultsJson = "{}";
+        int index = 0;
         for (Map.Entry<String, ?> benchmark : benchmarks.entrySet()) {
-            LOG.info("Running benchmark " + benchmark.getKey() + ".");
+            LOG.info(
+                    String.format(
+                            "Running benchmark %d/%d: %s",
+                            index++, benchmarks.keySet().size(), benchmark.getKey()));
 
             BenchmarkResult result =
                     BenchmarkUtils.runBenchmark(
                             tEnv, benchmark.getKey(), (Map<String, ?>) benchmark.getValue());
 
             results.add(result);
-            LOG.info(BenchmarkUtils.getResultsMapAsJson(result));
-        }
+            LOG.info("\n" + BenchmarkUtils.getResultsMapAsJson(result));
+
+            benchmarkResultsJson =
+                    BenchmarkUtils.getResultsMapAsJson(results.toArray(new BenchmarkResult[0]));
 
-        String benchmarkResultsJson =
-                BenchmarkUtils.getResultsMapAsJson(results.toArray(new BenchmarkResult[0]));
+            if (commandLine.hasOption(OUTPUT_FILE_OPTION.getLongOpt())) {
+                ReadWriteUtils.saveToFile(saveFile, benchmarkResultsJson, true);

Review Comment:
   According to offline discussion, we need to save the results in loop body to avoid losing all benchmark results when one of the benchmarks failed.
   
   I have added the codes to save exception information in result json file, but I'm not sure whether and how to visualize exceptions on the plot. Given that they can be saved as exception for various reasons, like failing during the Flink job's execution and failing during parameter parsing, there seems no proper place for error information in a plot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #87: [FLINK-27096] Improve Benchmark Framework

Posted by GitBox <gi...@apache.org>.
lindong28 commented on code in PR #87:
URL: https://github.com/apache/flink-ml/pull/87#discussion_r855177774


##########
flink-ml-benchmark/README.md:
##########
@@ -63,59 +63,75 @@ Then in Flink ML's binary distribution's folder, execute the following command
 to run the default benchmarks.
 
 ```bash
-./bin/flink-ml-benchmark.sh ./conf/benchmark-conf.json --output-file results.json
+./bin/benchmark-run.sh ./conf/benchmark-conf.json --output-file results.json
 ```
 
 You will notice that some Flink jobs are submitted to your Flink cluster, and
 the following information is printed out in your terminal. This means that you
 have successfully executed the default benchmarks.
 
 ```
-Found benchmarks [KMeansModel-1, KMeans-1]
+Found 2 benchmarks.
 ...
 Benchmarks execution completed.
 Benchmark results summary:
-[ {
-  "name" : "KMeansModel-1",
-  "totalTimeMs" : 782.0,
-  "inputRecordNum" : 10000,
-  "inputThroughput" : 12787.723785166241,
-  "outputRecordNum" : 10000,
-  "outputThroughput" : 12787.723785166241
-}, {
-  "name" : "KMeans-1",
-  "totalTimeMs" : 7022.0,
-  "inputRecordNum" : 10000,
-  "inputThroughput" : 1424.0956992309884,
-  "outputRecordNum" : 1,
-  "outputThroughput" : 0.14240956992309883
-} ]
+{
+  "KMeansModel-1" : {
+    "stage" : {
+      "className" : "org.apache.flink.ml.clustering.kmeans.KMeansModel",
+      "paramMap" : {
+        "k" : 2,
+        "distanceMeasure" : "euclidean",
+        "featuresCol" : "features",
+        "predictionCol" : "prediction"
+      }
+    },
+    ...
+    "modelData" : null,
+    "results" : {
+      "totalTimeMs" : 6596.0,
+      "inputRecordNum" : 10000,
+      "inputThroughput" : 1516.0703456640388,
+      "outputRecordNum" : 1,
+      "outputThroughput" : 0.15160703456640387
+    }
+  }
+}
 Benchmark results saved as json in results.json.
 ```
 
 The command above would save the results into `results.json` as below.
 
 ```json
-[ {
-  "name" : "KMeansModel-1",
-  "totalTimeMs" : 782.0,
-  "inputRecordNum" : 10000,
-  "inputThroughput" : 12787.723785166241,
-  "outputRecordNum" : 10000,
-  "outputThroughput" : 12787.723785166241
-}, {
-  "name" : "KMeans-1",
-  "totalTimeMs" : 7022.0,
-  "inputRecordNum" : 10000,
-  "inputThroughput" : 1424.0956992309884,
-  "outputRecordNum" : 1,
-  "outputThroughput" : 0.14240956992309883
-} ]
+{
+  "KMeansModel-1" : {
+    "stage" : {
+      "className" : "org.apache.flink.ml.clustering.kmeans.KMeansModel",
+      "paramMap" : {
+        "k" : 2,
+        "distanceMeasure" : "euclidean",
+        "featuresCol" : "features",
+        "predictionCol" : "prediction"
+      }
+    },
+    ...
+    "modelData" : null,

Review Comment:
   Would it be more intuitive to remove the this line if the model data is not specified for this stage?



##########
flink-ml-benchmark/README.md:
##########
@@ -194,3 +189,9 @@ stage. The stage is benchmarked against 10000 randomly generated vectors.
   }
 }
 ```
+
+### Benchmark Results Visualization
+
+`benchmark-results-visualize.py` is provided as a helper script to visualize

Review Comment:
   Could we provide commands that users can copy and paste to visualize the `results.json` in this quickstart?



##########
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkResult.java:
##########
@@ -18,13 +18,25 @@
 
 package org.apache.flink.ml.benchmark;
 
+import org.apache.flink.ml.api.Stage;
+import org.apache.flink.ml.benchmark.datagenerator.DataGenerator;
+import org.apache.flink.ml.benchmark.datagenerator.InputDataGenerator;
 import org.apache.flink.util.Preconditions;
 
 /** The result of executing a benchmark. */
 public class BenchmarkResult {
     /** The benchmark name. */
     public final String name;
 
+    /** The stage to execute benchmark on. */
+    public final Stage<?> stage;

Review Comment:
   Instead of keeping the stage/inputDataGenerator/modelDataGenerator instances in `BenchmarkResult`, would it be simpler to just pass the original `Map<String, ?> params` (which is used to instantiate these instances) to `BenchmarkUtils.getResultsMapAsJson(...)` directly?



##########
flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java:
##########
@@ -75,33 +75,43 @@ public static void printHelp() {
     public static void executeBenchmarks(CommandLine commandLine) throws Exception {
         String configFile = commandLine.getArgs()[0];
         Map<String, ?> benchmarks = BenchmarkUtils.parseJsonFile(configFile);
-        System.out.println("Found benchmarks " + benchmarks.keySet());
+        System.out.println("Found " + benchmarks.keySet().size() + " benchmarks.");
+        String saveFile = commandLine.getOptionValue(OUTPUT_FILE_OPTION.getLongOpt());
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
         List<BenchmarkResult> results = new ArrayList<>();
+        String benchmarkResultsJson = "{}";
+        int index = 0;
         for (Map.Entry<String, ?> benchmark : benchmarks.entrySet()) {
-            LOG.info("Running benchmark " + benchmark.getKey() + ".");
+            LOG.info(
+                    String.format(
+                            "Running benchmark %d/%d: %s",
+                            index++, benchmarks.keySet().size(), benchmark.getKey()));
 
             BenchmarkResult result =
                     BenchmarkUtils.runBenchmark(
                             tEnv, benchmark.getKey(), (Map<String, ?>) benchmark.getValue());
 
             results.add(result);
-            LOG.info(BenchmarkUtils.getResultsMapAsJson(result));
-        }
+            LOG.info("\n" + BenchmarkUtils.getResultsMapAsJson(result));
+
+            benchmarkResultsJson =
+                    BenchmarkUtils.getResultsMapAsJson(results.toArray(new BenchmarkResult[0]));
 
-        String benchmarkResultsJson =
-                BenchmarkUtils.getResultsMapAsJson(results.toArray(new BenchmarkResult[0]));
+            if (commandLine.hasOption(OUTPUT_FILE_OPTION.getLongOpt())) {
+                ReadWriteUtils.saveToFile(saveFile, benchmarkResultsJson, true);

Review Comment:
   Instead of saving all benchmark results in the loop body, would it be more efficient to save benchmark results after the while loop is done?
   
   We probably also need to record in the `results.json` the fact that a benchmark fails, probably with a string indicating the the cause of the failure. The `benchmark-results-visualize.py` would need to handle this information probably in the graph (e.g. with a red `X`) in the figure.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] yunfengzhou-hub commented on pull request #87: [FLINK-27096] Add a script to visualize benchmark results

Posted by GitBox <gi...@apache.org>.
yunfengzhou-hub commented on PR #87:
URL: https://github.com/apache/flink-ml/pull/87#issuecomment-1118082139

   Thanks for the comments. I have updated the PR according to the comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org