You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/28 07:14:10 UTC

[GitHub] [flink-ml] lindong28 commented on a change in pull request #71: [FLINK-26443] Add benchmark framework

lindong28 commented on a change in pull request #71:
URL: https://github.com/apache/flink-ml/pull/71#discussion_r836029049



##########
File path: flink-ml-benchmark/README.md
##########
@@ -0,0 +1,261 @@
+# Flink ML Benchmark Guideline
+
+This document provides instructions about how to run benchmarks on Flink ML's
+stages.
+
+## Write Benchmark Programs
+
+### Add Maven Dependencies
+
+In order to write Flink ML's java benchmark programs, first make sure that the
+following dependencies have been added to your maven project's `pom.xml`.
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-ml-core_${scala.binary.version}</artifactId>
+  <version>${flink.ml.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-ml-iteration_${scala.binary.version}</artifactId>
+  <version>${flink.ml.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-ml-lib_${scala.binary.version}</artifactId>
+  <version>${flink.ml.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-ml-benchmark_${scala.binary.version}</artifactId>
+  <version>${flink.ml.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>statefun-flink-core</artifactId>
+  <version>3.1.0</version>
+  <exclusions>
+    <exclusion>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-java_2.12</artifactId>
+    </exclusion>
+  </exclusions>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-clients_${scala.binary.version}</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+### Write Java Program
+
+Then you can write a program as follows to run benchmark on Flink ML stages. The
+example code below tests the performance of Flink ML's KMeans algorithm, with
+the default configuration parameters used.
+
+```java
+public class Main {
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        KMeans kMeans = new KMeans();
+        KMeansInputsGenerator inputsGenerator = new KMeansInputsGenerator();
+
+        BenchmarkResult result =
+                BenchmarkUtils.runBenchmark("exampleBenchmark", tEnv, kMeans, inputsGenerator);
+
+        BenchmarkUtils.printResult(result);
+    }
+}
+```
+
+### Execute Benchmark Program
+
+After executing the `main()` method above, you will see benchmark results
+printed out in your terminal. An example of the printed content is as follows.
+
+```
+Benchmark Name: exampleBenchmark
+Total Execution Time(ms): 828.0
+```
+
+### Configure Benchmark Parameters
+
+If you want to run benchmark on customed configuration parameters, you can set
+them with Flink ML's `WithParams` API as follows.
+
+```java
+KMeans kMeans = new KMeans()
+  .setK(5)
+  .setMaxIter(50);
+KMeansInputsGenerator inputsGenerator = new KMeansInputsGenerator()
+  .setDims(3)
+  .setDataSize(10000);
+```
+
+## Execute Benchmark through Command-Line Interface (CLI)
+
+You can also configure and execute benchmarks through Command-Line Interface
+(CLI) without writing java programs.
+
+### Prerequisites
+
+Before using Flink ML's CLI, make sure you have installed Flink 1.14 in your
+local environment, and that you have started a Flink cluster locally. If not,
+you can start a standalone session with the following command.
+
+```bash
+$ start-cluster
+```
+
+In order to use Flink ML's CLI you need to have the latest binary distribution
+of Flink ML. You can acquire the distribution by building Flink ML's source code
+locally, which means to execute the following command in Flink ML repository's
+root directory.
+
+```bash
+$ mvn clean package -DskipTests
+```
+
+After executing the command above, you will be able to find the binary
+distribution under
+`./flink-ml-dist/target/flink-ml-<version>-bin/flink-ml-<version>/`.
+
+### Run Benchmark CLI
+
+In the binary distribution's folder, execute the following command to run an
+example benchmark.
+
+```bash
+$ ./bin/flink-ml-benchmark.sh ./examples/benchmark-example-conf.json
+```
+
+You will notice that some Flink job is submitted to your Flink cluster, and the
+following information is printed out in your terminal. This means that you have
+successfully executed a benchmark on `KMeansModel`.
+
+```
+Job has been submitted with JobID 85b4a33df5c00a315e0d1142e1d743be
+Program execution finished
+Job with JobID 85b4a33df5c00a315e0d1142e1d743be has finished.
+Job Runtime: 828 ms
+
+Benchmark Name: KMeansModel-1
+Total Execution Time(ms): 828.0
+
+```
+
+### Save Benchmark Result to File
+
+`flink-ml-benchmark.sh` has redirected all warnings and process logs to stderr,
+and the benchmark results to stdout. So if you write the command in the
+following way
+
+```bash
+$ ./bin/flink-ml-benchmark.sh ./examples/benchmark-example-conf.json > output.txt

Review comment:
       This command is almost the same as the command above.
   
   Would it be simpler to just have one section `Run Benchmark CLI`? And in this section, we can explain how to get and interpret the benchmark results.

##########
File path: flink-ml-benchmark/README.md
##########
@@ -0,0 +1,261 @@
+# Flink ML Benchmark Guideline
+
+This document provides instructions about how to run benchmarks on Flink ML's
+stages.
+
+## Write Benchmark Programs
+
+### Add Maven Dependencies
+
+In order to write Flink ML's java benchmark programs, first make sure that the
+following dependencies have been added to your maven project's `pom.xml`.
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-ml-core_${scala.binary.version}</artifactId>
+  <version>${flink.ml.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-ml-iteration_${scala.binary.version}</artifactId>
+  <version>${flink.ml.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-ml-lib_${scala.binary.version}</artifactId>
+  <version>${flink.ml.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-ml-benchmark_${scala.binary.version}</artifactId>
+  <version>${flink.ml.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>statefun-flink-core</artifactId>
+  <version>3.1.0</version>
+  <exclusions>
+    <exclusion>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-java_2.12</artifactId>
+    </exclusion>
+  </exclusions>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-clients_${scala.binary.version}</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+### Write Java Program
+
+Then you can write a program as follows to run benchmark on Flink ML stages. The
+example code below tests the performance of Flink ML's KMeans algorithm, with
+the default configuration parameters used.
+
+```java
+public class Main {
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        KMeans kMeans = new KMeans();
+        KMeansInputsGenerator inputsGenerator = new KMeansInputsGenerator();
+
+        BenchmarkResult result =
+                BenchmarkUtils.runBenchmark("exampleBenchmark", tEnv, kMeans, inputsGenerator);
+
+        BenchmarkUtils.printResult(result);
+    }
+}
+```
+
+### Execute Benchmark Program
+
+After executing the `main()` method above, you will see benchmark results
+printed out in your terminal. An example of the printed content is as follows.
+
+```
+Benchmark Name: exampleBenchmark
+Total Execution Time(ms): 828.0
+```
+
+### Configure Benchmark Parameters
+
+If you want to run benchmark on customed configuration parameters, you can set
+them with Flink ML's `WithParams` API as follows.
+
+```java
+KMeans kMeans = new KMeans()
+  .setK(5)
+  .setMaxIter(50);
+KMeansInputsGenerator inputsGenerator = new KMeansInputsGenerator()
+  .setDims(3)
+  .setDataSize(10000);
+```
+
+## Execute Benchmark through Command-Line Interface (CLI)
+
+You can also configure and execute benchmarks through Command-Line Interface
+(CLI) without writing java programs.
+
+### Prerequisites
+
+Before using Flink ML's CLI, make sure you have installed Flink 1.14 in your
+local environment, and that you have started a Flink cluster locally. If not,
+you can start a standalone session with the following command.
+
+```bash
+$ start-cluster
+```
+
+In order to use Flink ML's CLI you need to have the latest binary distribution
+of Flink ML. You can acquire the distribution by building Flink ML's source code
+locally, which means to execute the following command in Flink ML repository's
+root directory.
+
+```bash
+$ mvn clean package -DskipTests
+```
+
+After executing the command above, you will be able to find the binary
+distribution under
+`./flink-ml-dist/target/flink-ml-<version>-bin/flink-ml-<version>/`.
+
+### Run Benchmark CLI
+
+In the binary distribution's folder, execute the following command to run an
+example benchmark.
+
+```bash
+$ ./bin/flink-ml-benchmark.sh ./examples/benchmark-example-conf.json
+```
+
+You will notice that some Flink job is submitted to your Flink cluster, and the
+following information is printed out in your terminal. This means that you have
+successfully executed a benchmark on `KMeansModel`.
+
+```
+Job has been submitted with JobID 85b4a33df5c00a315e0d1142e1d743be
+Program execution finished
+Job with JobID 85b4a33df5c00a315e0d1142e1d743be has finished.
+Job Runtime: 828 ms
+
+Benchmark Name: KMeansModel-1
+Total Execution Time(ms): 828.0
+
+```
+
+### Save Benchmark Result to File
+
+`flink-ml-benchmark.sh` has redirected all warnings and process logs to stderr,
+and the benchmark results to stdout. So if you write the command in the
+following way
+
+```bash
+$ ./bin/flink-ml-benchmark.sh ./examples/benchmark-example-conf.json > output.txt
+```
+
+You will get a clean benchmark result saved in `output.txt` as follows.
+
+```
+Benchmark Name: KMeansModel-1
+Total Execution Time(ms): 828.0
+
+```
+
+### Configuration File Format
+
+The benchmark CLI creates stages and test data according to the input
+configuration file. The configuration file should contain a JSON object in the
+following format.
+
+First of all, the file should contain the following configurations as the
+metadata of the JSON object.
+
+- `"_version"`: The version of the json format. Currently its value must be 1.
+
+Then, each benchmark should be specified through key-object pairs.
+
+The key for each benchmark JSON object will be regarded as the name of the
+benchmark. The benchmark name can contain English letters, numbers, hyphens(-)
+and underscores(_), but should not start with a hyphen or underscore.
+
+The value of each benchmark name should be a JSON object containing at least
+`"stage"` and `"inputs"`. If the stage to be tested is a `Model` and its model
+data needs to be explicitly set, the JSON object should also contain
+`"modelData"`.
+
+The value of `"stage"`, `"inputs"` or `"modelData"` should be a JSON object
+containing `"className"` and `paramMap`.
+
+- `"className"`'s value should be the full classpath of a `WithParams` subclass.
+  For `"stage"`, the class should be a subclass of `Stage`. For `"inputs"` or
+  `"modelData"`, the class should be a subclass of `DataGenerator`.
+
+- `"paramMap"`'s value should be a JSON object containing the parameters related
+  to the specific `Stage` or `DataGenerator`. Note that all parameter values
+  should be wrapped as strings.
+
+Combining the format requirements above, an example configuration file is as
+follows.
+
+```json
+{
+  "_version": 1,
+  "KMeansModel-1": {
+    "stage": {
+      "className": "org.apache.flink.ml.clustering.kmeans.KMeansModel",
+      "paramMap": {
+        "featuresCol": "\"features\"",

Review comment:
       Could we allow user to just specify `"featuresCol": "features"` instead of `"featuresCol": "\"features\""`?

##########
File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.util.Map;
+
+import static org.apache.flink.ml.util.ReadWriteUtils.OBJECT_MAPPER;
+
+/** Entry class for benchmark execution. */
+public class Benchmark {
+    @SuppressWarnings("unchecked")
+    public static void main(String[] args) throws Exception {
+        final PrintStream originalOut = System.out;
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        InputStream inputStream = new FileInputStream(args[0]);
+        Map<String, ?> jsonMap = OBJECT_MAPPER.readValue(inputStream, Map.class);
+        Map<String, Map<String, ?>> benchmarkParamsMap =
+                BenchmarkUtils.parseBenchmarkParams(jsonMap);
+        System.err.println("Found benchmarks " + benchmarkParamsMap.keySet());
+
+        for (String benchmarkName : benchmarkParamsMap.keySet()) {
+            System.err.println("Running benchmark " + benchmarkName + ".");
+
+            // Redirect all flink execution logs to stderr.
+            System.setOut(System.err);

Review comment:
       Can you explain why we need to do this?

##########
File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkUtils.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.api.Stage;
+import org.apache.flink.ml.benchmark.generator.DataGenerator;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.WithParams;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+/** Utility methods for benchmarks. */
+@SuppressWarnings("unchecked")
+public class BenchmarkUtils {
+    private static final String benchmarkNamePattern = "^[A-Za-z0-9][A-Za-z0-9_\\-]*$";
+
+    /**
+     * Loads benchmark paramMaps from the provided json map.
+     *
+     * @return A map whose key is the names of the loaded benchmarks, value is the parameters of the
+     *     benchmarks.
+     */
+    public static Map<String, Map<String, ?>> parseBenchmarkParams(Map<String, ?> jsonMap) {
+        Preconditions.checkArgument(
+                jsonMap.containsKey("_version") && jsonMap.get("_version").equals(1));
+
+        Map<String, Map<String, ?>> result = new HashMap<>();
+        for (String key : jsonMap.keySet()) {
+            if (!isValidBenchmarkName(key)) {
+                continue;
+            }
+            result.put(key, (Map<String, ?>) jsonMap.get(key));
+        }
+        return result;
+    }
+
+    /**
+     * Checks whether a string is a valid benchmark name.
+     *
+     * <p>A valid benchmark name should only contain English letters, numbers, hyphens (-) and
+     * underscores (_). The name should not start with a hyphen or underscore.
+     */
+    public static boolean isValidBenchmarkName(String name) {
+        return Pattern.matches(benchmarkNamePattern, name);
+    }
+
+    /**
+     * Instantiates a benchmark from its parameter map and executes the benchmark in the provided
+     * environment.
+     *
+     * @return Results of the executed benchmark.
+     */
+    public static BenchmarkResult runBenchmark(
+            String name, StreamTableEnvironment tEnv, Map<String, ?> benchmarkParamsMap)
+            throws Exception {
+        Stage<?> stage =
+                (Stage<?>) instantiateWithParams((Map<String, ?>) benchmarkParamsMap.get("stage"));
+
+        BenchmarkResult result;
+        if (benchmarkParamsMap.size() == 2) {
+            DataGenerator<?> inputsGenerator =
+                    (DataGenerator<?>)
+                            instantiateWithParams(
+                                    (Map<String, ?>) benchmarkParamsMap.get("inputs"));
+            result = runBenchmark(name, tEnv, stage, inputsGenerator);
+        } else if (benchmarkParamsMap.size() == 3 && stage instanceof Model) {
+            DataGenerator<?> inputsGenerator =
+                    (DataGenerator<?>)
+                            instantiateWithParams(
+                                    (Map<String, ?>) benchmarkParamsMap.get("inputs"));
+            DataGenerator<?> modelDataGenerator =
+                    (DataGenerator<?>)
+                            instantiateWithParams(
+                                    (Map<String, ?>) benchmarkParamsMap.get("modelData"));
+            result =
+                    runBenchmark(name, tEnv, (Model<?>) stage, modelDataGenerator, inputsGenerator);
+        } else {
+            throw new IllegalArgumentException(
+                    "Unsupported json map with keys " + benchmarkParamsMap.keySet());
+        }
+
+        return result;
+    }
+
+    /**
+     * Executes a benchmark from a stage and an inputsGenerator in the provided environment.
+     *
+     * @return Results of the executed benchmark.
+     */
+    public static BenchmarkResult runBenchmark(
+            String name,
+            StreamTableEnvironment tEnv,
+            Stage<?> stage,
+            DataGenerator<?> inputsGenerator)
+            throws Exception {
+        Table[] inputTables = inputsGenerator.getData(tEnv);
+
+        Table[] outputTables;
+        if (stage instanceof Estimator) {
+            outputTables = ((Estimator<?, ?>) stage).fit(inputTables).getModelData();
+        } else if (stage instanceof AlgoOperator) {
+            outputTables = ((AlgoOperator<?>) stage).transform(inputTables);
+        } else {
+            throw new IllegalArgumentException("Unsupported Stage class " + stage.getClass());
+        }
+
+        for (Table table : outputTables) {
+            tEnv.toDataStream(table).addSink(new DiscardingSink<>());
+        }
+
+        StreamExecutionEnvironment env = ((StreamTableEnvironmentImpl) tEnv).execEnv();
+        JobExecutionResult executionResult = env.execute();
+
+        BenchmarkResult result = new BenchmarkResult();
+        result.name = name;
+        result.executionTimeMillis = (double) executionResult.getNetRuntime(TimeUnit.MILLISECONDS);
+
+        return result;
+    }
+
+    /**
+     * Executes a benchmark from a model with its modelDataGenerator and inputsGenerator in the
+     * provided environment.
+     *
+     * @return Results of the executed benchmark.
+     */
+    public static BenchmarkResult runBenchmark(
+            String name,
+            StreamTableEnvironment tEnv,
+            Model<?> model,
+            DataGenerator<?> modelDataGenerator,
+            DataGenerator<?> inputsGenerator)
+            throws Exception {
+        model.setModelData(modelDataGenerator.getData(tEnv));
+
+        Table[] outputTables = model.transform(inputsGenerator.getData(tEnv));
+
+        for (Table table : outputTables) {
+            tEnv.toDataStream(table).addSink(new DiscardingSink<>());
+        }
+
+        StreamExecutionEnvironment env = ((StreamTableEnvironmentImpl) tEnv).execEnv();
+        JobExecutionResult executionResult = env.execute();
+
+        BenchmarkResult result = new BenchmarkResult();
+        result.name = name;
+        result.executionTimeMillis = (double) executionResult.getNetRuntime(TimeUnit.MILLISECONDS);
+
+        return result;
+    }
+
+    /**
+     * Instantiates a WithParams subclass from the provided json map.
+     *
+     * @param jsonMap a map containing className and paramMap.
+     * @return the instantiated WithParams subclass.
+     */
+    public static WithParams<?> instantiateWithParams(Map<String, ?> jsonMap) throws Exception {
+        String className = (String) jsonMap.get("className");
+        Class<WithParams<?>> clazz = (Class<WithParams<?>>) Class.forName(className);
+        WithParams<?> instance = InstantiationUtil.instantiate(clazz);
+
+        Map<String, Param<?>> nameToParam = new HashMap<>();
+        for (Param<?> param : ParamUtils.getPublicFinalParamFields(instance)) {
+            nameToParam.put(param.name, param);
+        }
+
+        Map<String, String> paramMap = (Map<String, String>) jsonMap.get("paramMap");
+        for (Map.Entry<String, String> entry : paramMap.entrySet()) {
+            Param<?> param = nameToParam.get(entry.getKey());
+            setParam(instance, param, param.jsonDecode(entry.getValue()));
+        }
+
+        return instance;
+    }
+
+    // A helper method that sets an object's parameter value. We can not call stage.set(param,
+    // value) directly because stage::set(...) needs the actual type of the value.
+    private static <T> void setParam(WithParams<?> withParams, Param<T> param, Object value) {
+        withParams.set(param, (T) value);
+    }
+
+    /** Prints out the provided benchmark result. */
+    public static void printResult(BenchmarkResult result) {
+        Preconditions.checkNotNull(result.name);
+        System.out.println("Benchmark Name: " + result.name);
+
+        if (result.executionTimeMillis != null) {

Review comment:
       When would this value be null? Could it be simpler to skip the `if` statement and always print its value (including null)?

##########
File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkUtils.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.api.Stage;
+import org.apache.flink.ml.benchmark.generator.DataGenerator;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.WithParams;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+/** Utility methods for benchmarks. */
+@SuppressWarnings("unchecked")
+public class BenchmarkUtils {
+    private static final String benchmarkNamePattern = "^[A-Za-z0-9][A-Za-z0-9_\\-]*$";
+
+    /**
+     * Loads benchmark paramMaps from the provided json map.
+     *
+     * @return A map whose key is the names of the loaded benchmarks, value is the parameters of the
+     *     benchmarks.
+     */
+    public static Map<String, Map<String, ?>> parseBenchmarkParams(Map<String, ?> jsonMap) {
+        Preconditions.checkArgument(
+                jsonMap.containsKey("_version") && jsonMap.get("_version").equals(1));
+
+        Map<String, Map<String, ?>> result = new HashMap<>();
+        for (String key : jsonMap.keySet()) {
+            if (!isValidBenchmarkName(key)) {

Review comment:
       Is it necessary to enforce the naming pattern here? Would it be simpler to just skip the reserved field names (e.g. version) here?

##########
File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkUtils.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.api.Stage;
+import org.apache.flink.ml.benchmark.generator.DataGenerator;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.WithParams;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+/** Utility methods for benchmarks. */
+@SuppressWarnings("unchecked")
+public class BenchmarkUtils {
+    private static final String benchmarkNamePattern = "^[A-Za-z0-9][A-Za-z0-9_\\-]*$";
+
+    /**
+     * Loads benchmark paramMaps from the provided json map.
+     *
+     * @return A map whose key is the names of the loaded benchmarks, value is the parameters of the
+     *     benchmarks.
+     */
+    public static Map<String, Map<String, ?>> parseBenchmarkParams(Map<String, ?> jsonMap) {
+        Preconditions.checkArgument(
+                jsonMap.containsKey("_version") && jsonMap.get("_version").equals(1));

Review comment:
       Hmm... did we learn the name `_version` from some other projects? If no, would it be simpler to just use `version` here?

##########
File path: flink-ml-benchmark/README.md
##########
@@ -0,0 +1,261 @@
+# Flink ML Benchmark Guideline
+
+This document provides instructions about how to run benchmarks on Flink ML's
+stages.
+
+## Write Benchmark Programs
+
+### Add Maven Dependencies
+
+In order to write Flink ML's java benchmark programs, first make sure that the
+following dependencies have been added to your maven project's `pom.xml`.
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-ml-core_${scala.binary.version}</artifactId>
+  <version>${flink.ml.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-ml-iteration_${scala.binary.version}</artifactId>
+  <version>${flink.ml.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-ml-lib_${scala.binary.version}</artifactId>
+  <version>${flink.ml.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-ml-benchmark_${scala.binary.version}</artifactId>
+  <version>${flink.ml.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>statefun-flink-core</artifactId>
+  <version>3.1.0</version>
+  <exclusions>
+    <exclusion>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-java_2.12</artifactId>
+    </exclusion>
+  </exclusions>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-clients_${scala.binary.version}</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+### Write Java Program
+
+Then you can write a program as follows to run benchmark on Flink ML stages. The
+example code below tests the performance of Flink ML's KMeans algorithm, with
+the default configuration parameters used.
+
+```java
+public class Main {
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        KMeans kMeans = new KMeans();
+        KMeansInputsGenerator inputsGenerator = new KMeansInputsGenerator();
+
+        BenchmarkResult result =
+                BenchmarkUtils.runBenchmark("exampleBenchmark", tEnv, kMeans, inputsGenerator);
+
+        BenchmarkUtils.printResult(result);
+    }
+}
+```
+
+### Execute Benchmark Program
+
+After executing the `main()` method above, you will see benchmark results
+printed out in your terminal. An example of the printed content is as follows.
+
+```
+Benchmark Name: exampleBenchmark
+Total Execution Time(ms): 828.0
+```
+
+### Configure Benchmark Parameters
+
+If you want to run benchmark on customed configuration parameters, you can set
+them with Flink ML's `WithParams` API as follows.
+
+```java
+KMeans kMeans = new KMeans()
+  .setK(5)
+  .setMaxIter(50);
+KMeansInputsGenerator inputsGenerator = new KMeansInputsGenerator()
+  .setDims(3)
+  .setDataSize(10000);
+```
+
+## Execute Benchmark through Command-Line Interface (CLI)
+
+You can also configure and execute benchmarks through Command-Line Interface
+(CLI) without writing java programs.
+
+### Prerequisites
+
+Before using Flink ML's CLI, make sure you have installed Flink 1.14 in your
+local environment, and that you have started a Flink cluster locally. If not,
+you can start a standalone session with the following command.
+
+```bash
+$ start-cluster

Review comment:
       Flink ML repo does not provide this script. Could we provide more information on how to run this script? We can provide an installation section similar to [1].
   
   [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/gettingstarted/#installation

##########
File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/Benchmark.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.util.Map;
+
+import static org.apache.flink.ml.util.ReadWriteUtils.OBJECT_MAPPER;
+
+/** Entry class for benchmark execution. */
+public class Benchmark {
+    @SuppressWarnings("unchecked")
+    public static void main(String[] args) throws Exception {
+        final PrintStream originalOut = System.out;
+
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        InputStream inputStream = new FileInputStream(args[0]);
+        Map<String, ?> jsonMap = OBJECT_MAPPER.readValue(inputStream, Map.class);
+        Map<String, Map<String, ?>> benchmarkParamsMap =
+                BenchmarkUtils.parseBenchmarkParams(jsonMap);
+        System.err.println("Found benchmarks " + benchmarkParamsMap.keySet());

Review comment:
       Should this be `System.out`? In general we only use `System.err` for error messages.
   
   Same for other System.err.

##########
File path: flink-ml-benchmark/README.md
##########
@@ -0,0 +1,261 @@
+# Flink ML Benchmark Guideline
+
+This document provides instructions about how to run benchmarks on Flink ML's
+stages.
+
+## Write Benchmark Programs
+
+### Add Maven Dependencies
+
+In order to write Flink ML's java benchmark programs, first make sure that the
+following dependencies have been added to your maven project's `pom.xml`.
+
+```xml
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-ml-core_${scala.binary.version}</artifactId>
+  <version>${flink.ml.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-ml-iteration_${scala.binary.version}</artifactId>
+  <version>${flink.ml.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-ml-lib_${scala.binary.version}</artifactId>
+  <version>${flink.ml.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-ml-benchmark_${scala.binary.version}</artifactId>
+  <version>${flink.ml.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>statefun-flink-core</artifactId>
+  <version>3.1.0</version>
+  <exclusions>
+    <exclusion>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-java_2.12</artifactId>
+    </exclusion>
+  </exclusions>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-clients_${scala.binary.version}</artifactId>
+  <version>${flink.version}</version>
+</dependency>
+```
+
+### Write Java Program
+
+Then you can write a program as follows to run benchmark on Flink ML stages. The
+example code below tests the performance of Flink ML's KMeans algorithm, with
+the default configuration parameters used.
+
+```java
+public class Main {
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        KMeans kMeans = new KMeans();
+        KMeansInputsGenerator inputsGenerator = new KMeansInputsGenerator();
+
+        BenchmarkResult result =
+                BenchmarkUtils.runBenchmark("exampleBenchmark", tEnv, kMeans, inputsGenerator);
+
+        BenchmarkUtils.printResult(result);
+    }
+}
+```
+
+### Execute Benchmark Program
+
+After executing the `main()` method above, you will see benchmark results
+printed out in your terminal. An example of the printed content is as follows.
+
+```
+Benchmark Name: exampleBenchmark
+Total Execution Time(ms): 828.0
+```
+
+### Configure Benchmark Parameters
+
+If you want to run benchmark on customed configuration parameters, you can set
+them with Flink ML's `WithParams` API as follows.
+
+```java
+KMeans kMeans = new KMeans()
+  .setK(5)
+  .setMaxIter(50);
+KMeansInputsGenerator inputsGenerator = new KMeansInputsGenerator()
+  .setDims(3)
+  .setDataSize(10000);
+```
+
+## Execute Benchmark through Command-Line Interface (CLI)
+
+You can also configure and execute benchmarks through Command-Line Interface
+(CLI) without writing java programs.
+
+### Prerequisites
+
+Before using Flink ML's CLI, make sure you have installed Flink 1.14 in your
+local environment, and that you have started a Flink cluster locally. If not,
+you can start a standalone session with the following command.
+
+```bash
+$ start-cluster
+```
+
+In order to use Flink ML's CLI you need to have the latest binary distribution
+of Flink ML. You can acquire the distribution by building Flink ML's source code
+locally, which means to execute the following command in Flink ML repository's
+root directory.
+
+```bash
+$ mvn clean package -DskipTests
+```
+
+After executing the command above, you will be able to find the binary
+distribution under
+`./flink-ml-dist/target/flink-ml-<version>-bin/flink-ml-<version>/`.

Review comment:
       Is it possible to fill in the version of the corresponding branch using a variable?

##########
File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkUtils.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.api.Stage;
+import org.apache.flink.ml.benchmark.generator.DataGenerator;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.WithParams;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+/** Utility methods for benchmarks. */
+@SuppressWarnings("unchecked")
+public class BenchmarkUtils {
+    private static final String benchmarkNamePattern = "^[A-Za-z0-9][A-Za-z0-9_\\-]*$";
+
+    /**
+     * Loads benchmark paramMaps from the provided json map.
+     *
+     * @return A map whose key is the names of the loaded benchmarks, value is the parameters of the
+     *     benchmarks.
+     */
+    public static Map<String, Map<String, ?>> parseBenchmarkParams(Map<String, ?> jsonMap) {
+        Preconditions.checkArgument(
+                jsonMap.containsKey("_version") && jsonMap.get("_version").equals(1));
+
+        Map<String, Map<String, ?>> result = new HashMap<>();
+        for (String key : jsonMap.keySet()) {
+            if (!isValidBenchmarkName(key)) {
+                continue;
+            }
+            result.put(key, (Map<String, ?>) jsonMap.get(key));
+        }
+        return result;
+    }
+
+    /**
+     * Checks whether a string is a valid benchmark name.
+     *
+     * <p>A valid benchmark name should only contain English letters, numbers, hyphens (-) and
+     * underscores (_). The name should not start with a hyphen or underscore.
+     */
+    public static boolean isValidBenchmarkName(String name) {
+        return Pattern.matches(benchmarkNamePattern, name);
+    }
+
+    /**
+     * Instantiates a benchmark from its parameter map and executes the benchmark in the provided
+     * environment.
+     *
+     * @return Results of the executed benchmark.
+     */
+    public static BenchmarkResult runBenchmark(

Review comment:
       Would the following implementation be simpler and shorter?
   
   Note that we can change the signature of `instantiateWithParams(...)` to `<T extends WithParams<?>> T instantiateWithParams(Map<String, ?> jsonMap)`
   
   ```
       public static BenchmarkResult runBenchmark(
               StreamExecutionEnvironment env, String name, Map<String, ?> params)
               throws Exception {
           Stage<?> stage = instantiateWithParams((Map<String, ?>) params.get("stage"));
           DataGenerator<?> inputsGenerator = instantiateWithParams((Map<String, ?>) params.get("inputs"));
           DataGenerator<?> modelDataGenerator = null;
           if (params.containsKey("modelData")) {
               modelDataGenerator = instantiateWithParams((Map<String, ?>) params.get("modelData"));
           }
   
           return runBenchmark(env, name, stage, inputsGenerator, modelDataGenerator);
       }
   
       public static BenchmarkResult runBenchmark(
               StreamExecutionEnvironment env,
               String name,
               Stage<?> stage,
               DataGenerator<?> inputsGenerator,
               DataGenerator<?> modelDataGenerator)
               throws Exception {
           StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
   
           Table[] inputTables = inputsGenerator.getData(tEnv);
           if (modelDataGenerator != null) {
               ((Model<?>) stage).setModelData(modelDataGenerator.getData(tEnv));
           }
   
           Table[] outputTables;
           if (stage instanceof Estimator) {
               outputTables = ((Estimator<?, ?>) stage).fit(inputTables).getModelData();
           } else if (stage instanceof AlgoOperator) {
               outputTables = ((AlgoOperator<?>) stage).transform(inputTables);
           } else {
               throw new IllegalArgumentException("Unsupported Stage class " + stage.getClass());
           }
   
           for (Table table : outputTables) {
               tEnv.toDataStream(table).addSink(new DiscardingSink<>());
           }
   
           JobExecutionResult executionResult = env.execute();
   
           BenchmarkResult result = new BenchmarkResult();
           result.name = name;
           result.executionTimeMillis = (double) executionResult.getNetRuntime(TimeUnit.MILLISECONDS);
   
           return result;
       }
   ```

##########
File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkUtils.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.api.Stage;
+import org.apache.flink.ml.benchmark.generator.DataGenerator;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.WithParams;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+/** Utility methods for benchmarks. */
+@SuppressWarnings("unchecked")
+public class BenchmarkUtils {
+    private static final String benchmarkNamePattern = "^[A-Za-z0-9][A-Za-z0-9_\\-]*$";
+
+    /**
+     * Loads benchmark paramMaps from the provided json map.
+     *
+     * @return A map whose key is the names of the loaded benchmarks, value is the parameters of the
+     *     benchmarks.
+     */
+    public static Map<String, Map<String, ?>> parseBenchmarkParams(Map<String, ?> jsonMap) {
+        Preconditions.checkArgument(
+                jsonMap.containsKey("_version") && jsonMap.get("_version").equals(1));
+
+        Map<String, Map<String, ?>> result = new HashMap<>();
+        for (String key : jsonMap.keySet()) {
+            if (!isValidBenchmarkName(key)) {
+                continue;
+            }
+            result.put(key, (Map<String, ?>) jsonMap.get(key));
+        }
+        return result;
+    }
+
+    /**
+     * Checks whether a string is a valid benchmark name.
+     *
+     * <p>A valid benchmark name should only contain English letters, numbers, hyphens (-) and
+     * underscores (_). The name should not start with a hyphen or underscore.
+     */
+    public static boolean isValidBenchmarkName(String name) {
+        return Pattern.matches(benchmarkNamePattern, name);
+    }
+
+    /**
+     * Instantiates a benchmark from its parameter map and executes the benchmark in the provided
+     * environment.
+     *
+     * @return Results of the executed benchmark.
+     */
+    public static BenchmarkResult runBenchmark(
+            String name, StreamTableEnvironment tEnv, Map<String, ?> benchmarkParamsMap)
+            throws Exception {
+        Stage<?> stage =
+                (Stage<?>) instantiateWithParams((Map<String, ?>) benchmarkParamsMap.get("stage"));
+
+        BenchmarkResult result;
+        if (benchmarkParamsMap.size() == 2) {
+            DataGenerator<?> inputsGenerator =
+                    (DataGenerator<?>)
+                            instantiateWithParams(
+                                    (Map<String, ?>) benchmarkParamsMap.get("inputs"));
+            result = runBenchmark(name, tEnv, stage, inputsGenerator);
+        } else if (benchmarkParamsMap.size() == 3 && stage instanceof Model) {
+            DataGenerator<?> inputsGenerator =
+                    (DataGenerator<?>)
+                            instantiateWithParams(
+                                    (Map<String, ?>) benchmarkParamsMap.get("inputs"));
+            DataGenerator<?> modelDataGenerator =
+                    (DataGenerator<?>)
+                            instantiateWithParams(
+                                    (Map<String, ?>) benchmarkParamsMap.get("modelData"));
+            result =
+                    runBenchmark(name, tEnv, (Model<?>) stage, modelDataGenerator, inputsGenerator);
+        } else {
+            throw new IllegalArgumentException(
+                    "Unsupported json map with keys " + benchmarkParamsMap.keySet());
+        }
+
+        return result;
+    }
+
+    /**
+     * Executes a benchmark from a stage and an inputsGenerator in the provided environment.
+     *
+     * @return Results of the executed benchmark.
+     */
+    public static BenchmarkResult runBenchmark(
+            String name,
+            StreamTableEnvironment tEnv,
+            Stage<?> stage,
+            DataGenerator<?> inputsGenerator)
+            throws Exception {
+        Table[] inputTables = inputsGenerator.getData(tEnv);
+
+        Table[] outputTables;
+        if (stage instanceof Estimator) {
+            outputTables = ((Estimator<?, ?>) stage).fit(inputTables).getModelData();
+        } else if (stage instanceof AlgoOperator) {
+            outputTables = ((AlgoOperator<?>) stage).transform(inputTables);
+        } else {
+            throw new IllegalArgumentException("Unsupported Stage class " + stage.getClass());
+        }
+
+        for (Table table : outputTables) {
+            tEnv.toDataStream(table).addSink(new DiscardingSink<>());
+        }
+
+        StreamExecutionEnvironment env = ((StreamTableEnvironmentImpl) tEnv).execEnv();
+        JobExecutionResult executionResult = env.execute();
+
+        BenchmarkResult result = new BenchmarkResult();
+        result.name = name;
+        result.executionTimeMillis = (double) executionResult.getNetRuntime(TimeUnit.MILLISECONDS);

Review comment:
       We need a general solution to calculate the throughput.
   
   How about this:
   - Expose API in the DataGenerator to get the number of input values used in the fit/transform.
   - Get the number of output values from outputTables.
   - Expose the inputValueNum, outputNum, inputThoughput and outputThroughput in the BenchmarkResult.

##########
File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkUtils.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.api.Stage;
+import org.apache.flink.ml.benchmark.generator.DataGenerator;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.WithParams;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+/** Utility methods for benchmarks. */
+@SuppressWarnings("unchecked")
+public class BenchmarkUtils {
+    private static final String benchmarkNamePattern = "^[A-Za-z0-9][A-Za-z0-9_\\-]*$";
+
+    /**
+     * Loads benchmark paramMaps from the provided json map.
+     *
+     * @return A map whose key is the names of the loaded benchmarks, value is the parameters of the
+     *     benchmarks.
+     */
+    public static Map<String, Map<String, ?>> parseBenchmarkParams(Map<String, ?> jsonMap) {

Review comment:
       Logic of this method seems pretty straightforward. Can we remove this method, move the version check logic to `Benchmark`, and do the type conversion right before invoking `BenchmarkUtils.runBenchmark(...)`?

##########
File path: flink-ml-dist/src/main/flink-ml-bin/bin/flink-ml-benchmark.sh
##########
@@ -0,0 +1,61 @@
+#!/usr/bin/env bash
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+current_path=$(pwd)
+flink_ml_bin_path="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )"
+flink_ml_root_path="$(dirname "$flink_ml_bin_path")"
+
+# Checks flink command.
+flink_cmd="flink"

Review comment:
       Since Flink ML does not provide this script, users typically would download Flink binary distribution to get the script, which is typically in a different directory from the Flink ML directory.
   
   Should we provide a way for users to specify the path to Flink scripts used in this script? This could be specified by using environment variable.

##########
File path: flink-ml-benchmark/src/main/java/org/apache/flink/ml/benchmark/BenchmarkUtils.java
##########
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.benchmark;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.ml.api.AlgoOperator;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.api.Stage;
+import org.apache.flink.ml.benchmark.generator.DataGenerator;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.WithParams;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+/** Utility methods for benchmarks. */
+@SuppressWarnings("unchecked")
+public class BenchmarkUtils {
+    private static final String benchmarkNamePattern = "^[A-Za-z0-9][A-Za-z0-9_\\-]*$";
+
+    /**
+     * Loads benchmark paramMaps from the provided json map.
+     *
+     * @return A map whose key is the names of the loaded benchmarks, value is the parameters of the
+     *     benchmarks.
+     */
+    public static Map<String, Map<String, ?>> parseBenchmarkParams(Map<String, ?> jsonMap) {
+        Preconditions.checkArgument(
+                jsonMap.containsKey("_version") && jsonMap.get("_version").equals(1));
+
+        Map<String, Map<String, ?>> result = new HashMap<>();
+        for (String key : jsonMap.keySet()) {
+            if (!isValidBenchmarkName(key)) {
+                continue;
+            }
+            result.put(key, (Map<String, ?>) jsonMap.get(key));
+        }
+        return result;
+    }
+
+    /**
+     * Checks whether a string is a valid benchmark name.
+     *
+     * <p>A valid benchmark name should only contain English letters, numbers, hyphens (-) and
+     * underscores (_). The name should not start with a hyphen or underscore.
+     */
+    public static boolean isValidBenchmarkName(String name) {
+        return Pattern.matches(benchmarkNamePattern, name);
+    }
+
+    /**
+     * Instantiates a benchmark from its parameter map and executes the benchmark in the provided
+     * environment.
+     *
+     * @return Results of the executed benchmark.
+     */
+    public static BenchmarkResult runBenchmark(
+            String name, StreamTableEnvironment tEnv, Map<String, ?> benchmarkParamsMap)
+            throws Exception {
+        Stage<?> stage =
+                (Stage<?>) instantiateWithParams((Map<String, ?>) benchmarkParamsMap.get("stage"));
+
+        BenchmarkResult result;
+        if (benchmarkParamsMap.size() == 2) {
+            DataGenerator<?> inputsGenerator =
+                    (DataGenerator<?>)
+                            instantiateWithParams(
+                                    (Map<String, ?>) benchmarkParamsMap.get("inputs"));
+            result = runBenchmark(name, tEnv, stage, inputsGenerator);
+        } else if (benchmarkParamsMap.size() == 3 && stage instanceof Model) {
+            DataGenerator<?> inputsGenerator =
+                    (DataGenerator<?>)
+                            instantiateWithParams(
+                                    (Map<String, ?>) benchmarkParamsMap.get("inputs"));
+            DataGenerator<?> modelDataGenerator =
+                    (DataGenerator<?>)
+                            instantiateWithParams(
+                                    (Map<String, ?>) benchmarkParamsMap.get("modelData"));
+            result =
+                    runBenchmark(name, tEnv, (Model<?>) stage, modelDataGenerator, inputsGenerator);
+        } else {
+            throw new IllegalArgumentException(
+                    "Unsupported json map with keys " + benchmarkParamsMap.keySet());
+        }
+
+        return result;
+    }
+
+    /**
+     * Executes a benchmark from a stage and an inputsGenerator in the provided environment.
+     *
+     * @return Results of the executed benchmark.
+     */
+    public static BenchmarkResult runBenchmark(
+            String name,
+            StreamTableEnvironment tEnv,
+            Stage<?> stage,
+            DataGenerator<?> inputsGenerator)
+            throws Exception {
+        Table[] inputTables = inputsGenerator.getData(tEnv);
+
+        Table[] outputTables;
+        if (stage instanceof Estimator) {
+            outputTables = ((Estimator<?, ?>) stage).fit(inputTables).getModelData();
+        } else if (stage instanceof AlgoOperator) {
+            outputTables = ((AlgoOperator<?>) stage).transform(inputTables);
+        } else {
+            throw new IllegalArgumentException("Unsupported Stage class " + stage.getClass());
+        }
+
+        for (Table table : outputTables) {
+            tEnv.toDataStream(table).addSink(new DiscardingSink<>());
+        }
+
+        StreamExecutionEnvironment env = ((StreamTableEnvironmentImpl) tEnv).execEnv();
+        JobExecutionResult executionResult = env.execute();
+
+        BenchmarkResult result = new BenchmarkResult();
+        result.name = name;
+        result.executionTimeMillis = (double) executionResult.getNetRuntime(TimeUnit.MILLISECONDS);
+
+        return result;
+    }
+
+    /**
+     * Executes a benchmark from a model with its modelDataGenerator and inputsGenerator in the
+     * provided environment.
+     *
+     * @return Results of the executed benchmark.
+     */
+    public static BenchmarkResult runBenchmark(
+            String name,
+            StreamTableEnvironment tEnv,
+            Model<?> model,
+            DataGenerator<?> modelDataGenerator,
+            DataGenerator<?> inputsGenerator)
+            throws Exception {
+        model.setModelData(modelDataGenerator.getData(tEnv));
+
+        Table[] outputTables = model.transform(inputsGenerator.getData(tEnv));
+
+        for (Table table : outputTables) {
+            tEnv.toDataStream(table).addSink(new DiscardingSink<>());
+        }
+
+        StreamExecutionEnvironment env = ((StreamTableEnvironmentImpl) tEnv).execEnv();
+        JobExecutionResult executionResult = env.execute();
+
+        BenchmarkResult result = new BenchmarkResult();
+        result.name = name;
+        result.executionTimeMillis = (double) executionResult.getNetRuntime(TimeUnit.MILLISECONDS);
+
+        return result;
+    }
+
+    /**
+     * Instantiates a WithParams subclass from the provided json map.
+     *
+     * @param jsonMap a map containing className and paramMap.
+     * @return the instantiated WithParams subclass.
+     */
+    public static WithParams<?> instantiateWithParams(Map<String, ?> jsonMap) throws Exception {

Review comment:
       It appears that this method shares a lot of code with `ReadWriteUtils::loadStageParam(...)`. How about we add this method to ReadWriteUtils and update `loadStageParam(...)` to use `instantiateWithParams(...)`?

##########
File path: flink-ml-benchmark/README.md
##########
@@ -0,0 +1,261 @@
+# Flink ML Benchmark Guideline
+
+This document provides instructions about how to run benchmarks on Flink ML's
+stages.
+
+## Write Benchmark Programs
+
+### Add Maven Dependencies
+
+In order to write Flink ML's java benchmark programs, first make sure that the
+following dependencies have been added to your maven project's `pom.xml`.
+
+```xml
+<dependency>

Review comment:
       Is this the minimal dependencies that user need to explicitly put in the pom.xml? Could we simplify it by using flink-ml-uber?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org