You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/08/25 04:34:16 UTC

[flink-table-store] branch master updated: [FLINK-27739] Update table store benchmark usage

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 8815608e [FLINK-27739] Update table store benchmark usage
8815608e is described below

commit 8815608e0a695985aff1de7c68601407272256b0
Author: tsreaper <ts...@gmail.com>
AuthorDate: Thu Aug 25 12:32:56 2022 +0800

    [FLINK-27739] Update table store benchmark usage
    
    This closes #274
---
 flink-table-store-benchmark/README.md              |  18 +-
 .../flink/table/store/benchmark/Benchmark.java     |  87 +++++-----
 .../table/store/benchmark/BenchmarkOptions.java    |   6 -
 .../apache/flink/table/store/benchmark/Query.java  |  96 ++++++-----
 .../flink/table/store/benchmark/QueryRunner.java   |  84 ++++++----
 .../store/benchmark/metric/BenchmarkMetric.java    |  50 ++----
 .../store/benchmark/metric/FlinkRestClient.java    |  83 +++++++---
 .../store/benchmark/metric/JobBenchmarkMetric.java |  71 +++-----
 .../store/benchmark/metric/MetricReporter.java     |  65 ++++----
 .../store/benchmark/metric/bytes/BpsMetric.java    | 165 -------------------
 .../benchmark/metric/bytes/TotalBytesMetric.java   | 162 ------------------
 .../utils/BenchmarkGlobalConfiguration.java        |   9 +-
 .../src/main/resources/conf/benchmark.yaml         |   3 +-
 .../src/main/resources/queries/q1.sql              |   4 +-
 .../src/main/resources/queries/q2.sql              | 179 --------------------
 .../src/main/resources/queries/q3.sql              |  92 -----------
 .../src/main/resources/queries/q4.sql              | 181 ---------------------
 .../src/main/resources/queries/queries.yaml        |   9 +-
 18 files changed, 304 insertions(+), 1060 deletions(-)

diff --git a/flink-table-store-benchmark/README.md b/flink-table-store-benchmark/README.md
index d9dda83c..a9b9416d 100644
--- a/flink-table-store-benchmark/README.md
+++ b/flink-table-store-benchmark/README.md
@@ -9,12 +9,13 @@ This is the benchmark module for Flink Table Store. Inspired by [Nexmark](https:
   * Two worker nodes with 16 cores and 64GB RAM.
 * This benchmark runs on a standalone Flink cluster. Download Flink >= 1.15 from the [Apache Flink's website](https://flink.apache.org/downloads.html#apache-flink-1150) and setup a standalone cluster. Flink's job manager must be on the master node of your EMR cluster. We recommend the following Flink configurations:
     ```yaml
-    jobmanager.memory.process.size: 8192m
+    jobmanager.memory.process.size: 4096m
     taskmanager.memory.process.size: 4096m
     taskmanager.numberOfTaskSlots: 1
     parallelism.default: 16
     execution.checkpointing.interval: 3min
     state.backend: rocksdb
+    state.backend.incremental: true
     ```
     With this Flink configuration, you'll need 16 task manager instances in total, 8 on each EMR worker.
 * This benchmark needs the `FLINK_HOME` environment variable. Set `FLINK_HOME` to your Flink directory.
@@ -29,33 +30,30 @@ This is the benchmark module for Flink Table Store. Inspired by [Nexmark](https:
 * Run `flink-table-store-benchmark/bin/setup_cluster.sh` in master node. This activates the CPU metrics collector in worker nodes. Note that if you restart your Flink cluster, you must also restart the CPU metrics collectors. To stop CPU metrics collectors, run `flink-table-store-benchmark/bin/shutdown_cluster.sh` in master node.
 
 ### Run Benchmark
-* Run `flink-table-store-benchmark/bin/run_benchmark.sh <query> <sink>` to run `<query>` for `<sink>`. Currently `<query>` can be `q1`, `q2` or `all`, and sink can only be `table_store`.
+* Run `flink-table-store-benchmark/bin/run_benchmark.sh <query> <sink>` to run `<query>` for `<sink>`. Currently `<query>` can be `q1` or `all`, and sink can only be `table_store`.
 * By default, each query writes for 30 minutes and then reads all records back from the sink to measure read throughput. 
 
 ## Queries
 
 |#|Description|
 |---|---|
-|q1|Test insert and update random primary keys with small record size (100 bytes per record).|
-|q2|Test insert and update random primary keys with large record size (1500 bytes per record).|
-|q3|Test insert and update primary keys related with time with small record size (100 bytes per record).|
-|q4|Test insert and update primary keys related with time with large record size (1500 bytes per record).|
+|q1|Test insert and update random primary keys with normal record size (100 bytes per record). Mimics the update of uv and pv of items in an E-commercial website.|
 
 ## Benchmark Results
 
 Results of each query consist of the following aspects:
-* Throughput (byte/s): Average number of bytes inserted into the sink per second.
-* Total Bytes: Total number of bytes written during the given time.
+* Throughput (rows/s): Average number of rows inserted into the sink per second.
+* Total Rows: Total number of rows written.
 * Cores: Average CPU cost.
 * Throughput/Cores: Number of bytes inserted into the sink per second per CPU.
 * Avg Data Freshness: Average time elapsed from the starting point of the last successful checkpoint.
 * Max Data Freshness: Max time elapsed from the starting point of the last successful checkpoint.
-* Query Throughput (row/s): Number of rows read from the sink per second.
 
 ## How to Add New Queries
 1. Add your query to `flink-table-store-benchmark/queries` as a SQL script.
 2. Modify `flink-table-store-benchmark/queries/queries.yaml` to set the properties of this test. Supported properties are:
-  * `bounded`: If this test is bounded. If this is a bounded test then test results will be reported after the job is completed; otherwise results will be reported after `benchmark.metric.monitor.duration`.
+  * `sql`: An array of SQL scripts. These SQL scripts will run in the given order. Each SQL script will stop only after it produces enough number of rows.
+  * `row-num`: Number of rows produced from the source for each SQL scripts.
 
 Note that each query must contain a `-- __SINK_DDL_BEGIN__` and `-- __SINK_DDL_END__` so that this DDL can be used for both write and read tests. See existing queries for detail.
 
diff --git a/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/Benchmark.java b/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/Benchmark.java
index aa105c7f..3634c5b1 100644
--- a/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/Benchmark.java
+++ b/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/Benchmark.java
@@ -21,9 +21,9 @@ package org.apache.flink.table.store.benchmark;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.store.benchmark.metric.FlinkRestClient;
 import org.apache.flink.table.store.benchmark.metric.JobBenchmarkMetric;
-import org.apache.flink.table.store.benchmark.metric.MetricReporter;
 import org.apache.flink.table.store.benchmark.metric.cpu.CpuMetricReceiver;
 import org.apache.flink.table.store.benchmark.utils.BenchmarkGlobalConfiguration;
+import org.apache.flink.table.store.benchmark.utils.BenchmarkUtils;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.DefaultParser;
@@ -32,7 +32,6 @@ import org.apache.commons.cli.Options;
 
 import java.io.File;
 import java.nio.file.Path;
-import java.time.Duration;
 import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -117,80 +116,88 @@ public class Benchmark {
         CpuMetricReceiver cpuMetricReceiver = new CpuMetricReceiver(reporterAddress, reporterPort);
         cpuMetricReceiver.runServer();
 
-        Duration monitorDelay = benchmarkConf.get(BenchmarkOptions.METRIC_MONITOR_DELAY);
-        Duration monitorInterval = benchmarkConf.get(BenchmarkOptions.METRIC_MONITOR_INTERVAL);
-        Duration monitorDuration = benchmarkConf.get(BenchmarkOptions.METRIC_MONITOR_DURATION);
-
         // start to run queries
-        LinkedHashMap<String, JobBenchmarkMetric> totalMetrics = new LinkedHashMap<>();
+        LinkedHashMap<String, QueryRunner.Result> totalResults = new LinkedHashMap<>();
         for (Query query : queries) {
             for (Sink sink : sinks) {
-                MetricReporter reporter =
-                        new MetricReporter(
+                QueryRunner runner =
+                        new QueryRunner(
+                                query,
+                                sink,
+                                flinkDist,
                                 flinkRestClient,
                                 cpuMetricReceiver,
-                                monitorDelay,
-                                monitorInterval,
-                                query.bounded() ? null : monitorDuration);
-                QueryRunner runner =
-                        new QueryRunner(query, sink, flinkDist, reporter, flinkRestClient);
-                JobBenchmarkMetric metric = runner.run();
-                totalMetrics.put(query.name() + " - " + sink.name(), metric);
+                                benchmarkConf);
+                QueryRunner.Result result = runner.run();
+                totalResults.put(query.name() + " - " + sink.name(), result);
             }
         }
 
         // print benchmark summary
-        printSummary(totalMetrics);
+        printSummary(totalResults);
 
         flinkRestClient.close();
         cpuMetricReceiver.close();
     }
 
-    public static void printSummary(LinkedHashMap<String, JobBenchmarkMetric> totalMetrics) {
-        if (totalMetrics.isEmpty()) {
+    public static void printSummary(LinkedHashMap<String, QueryRunner.Result> totalResults) {
+        if (totalResults.isEmpty()) {
             return;
         }
         System.err.println(
                 "-------------------------------- Benchmark Results --------------------------------");
         int itemMaxLength = 27;
         System.err.println();
-        printBPSSummary(itemMaxLength, totalMetrics);
+        printBPSSummary(itemMaxLength, totalResults);
         System.err.println();
     }
 
     private static void printBPSSummary(
-            int itemMaxLength, LinkedHashMap<String, JobBenchmarkMetric> totalMetrics) {
-        printLine('-', "+", itemMaxLength, "", "", "", "", "", "", "", "");
+            int itemMaxLength, LinkedHashMap<String, QueryRunner.Result> totalResults) {
+        String[] emptyItems = new String[7];
+        Arrays.fill(emptyItems, "");
+        printLine('-', "+", itemMaxLength, emptyItems);
         printLine(
                 ' ',
                 "|",
                 itemMaxLength,
                 " Benchmark Query",
-                " Throughput (byte/s)",
-                " Total Bytes",
+                " Throughput (rows/s)",
+                " Total Rows",
                 " Cores",
-                " Throughput/Cores",
+                " Throughput/Core",
                 " Avg Data Freshness",
-                " Max Data Freshness",
-                " Query Throughput (row/s)");
-        printLine('-', "+", itemMaxLength, "", "", "", "", "", "", "", "");
-
-        for (Map.Entry<String, JobBenchmarkMetric> entry : totalMetrics.entrySet()) {
-            JobBenchmarkMetric metric = entry.getValue();
+                " Max Data Freshness");
+        printLine('-', "+", itemMaxLength, emptyItems);
+
+        for (Map.Entry<String, QueryRunner.Result> entry : totalResults.entrySet()) {
+            QueryRunner.Result result = entry.getValue();
+            for (JobBenchmarkMetric metric : result.writeMetric) {
+                printLine(
+                        ' ',
+                        "|",
+                        itemMaxLength,
+                        entry.getKey() + " - " + metric.getName(),
+                        metric.getPrettyRps(),
+                        metric.getPrettyTotalRows(),
+                        metric.getPrettyCpu(),
+                        metric.getPrettyRpsPerCore(),
+                        metric.getAvgDataFreshnessString(),
+                        metric.getMaxDataFreshnessString());
+            }
             printLine(
                     ' ',
                     "|",
                     itemMaxLength,
-                    entry.getKey(),
-                    metric.getPrettyBps(),
-                    metric.getPrettyTotalBytes(),
-                    metric.getPrettyCpu(),
-                    metric.getPrettyBpsPerCore(),
-                    metric.getAvgDataFreshnessString(),
-                    metric.getMaxDataFreshnessString(),
-                    metric.getPrettyQueryRps());
+                    entry.getKey() + " - Scan",
+                    BenchmarkUtils.formatLongValue((long) result.scanRps),
+                    "",
+                    "",
+                    "",
+                    "",
+                    "");
         }
-        printLine('-', "+", itemMaxLength, "", "", "", "", "", "", "", "");
+        printLine('-', "+", itemMaxLength, emptyItems);
     }
 
     private static void printLine(
diff --git a/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/BenchmarkOptions.java b/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/BenchmarkOptions.java
index 3bf7b720..26ff9ab0 100644
--- a/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/BenchmarkOptions.java
+++ b/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/BenchmarkOptions.java
@@ -33,12 +33,6 @@ public class BenchmarkOptions {
                     .withDescription(
                             "When to monitor the metrics, default 1 minute after job is started.");
 
-    public static final ConfigOption<Duration> METRIC_MONITOR_DURATION =
-            ConfigOptions.key("benchmark.metric.monitor.duration")
-                    .durationType()
-                    .defaultValue(Duration.ofMinutes(30))
-                    .withDescription("How long to monitor the metrics, default 30 minutes.");
-
     public static final ConfigOption<Duration> METRIC_MONITOR_INTERVAL =
             ConfigOptions.key("benchmark.metric.monitor.interval")
                     .durationType()
diff --git a/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/Query.java b/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/Query.java
index 1a0a9a63..12b2cf17 100644
--- a/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/Query.java
+++ b/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/Query.java
@@ -26,7 +26,6 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -37,32 +36,32 @@ public class Query {
             Pattern.compile("-- __SINK_DDL_BEGIN__([\\s\\S]*?)-- __SINK_DDL_END__");
 
     private final String name;
-    private final String beforeSql;
-    private final String querySql;
-    private final boolean bounded;
+    private final List<String> writeSqlName;
+    private final List<String> writeSqlText;
+    private final List<Long> rowNum;
 
-    private Query(String name, String beforeSql, String querySql, boolean bounded) {
+    private Query(
+            String name, List<String> writeSqlName, List<String> writeSqlText, List<Long> rowNum) {
         this.name = name;
-        this.beforeSql = beforeSql;
-        this.querySql = querySql;
-        this.bounded = bounded;
+        this.writeSqlName = writeSqlName;
+        this.writeSqlText = writeSqlText;
+        this.rowNum = rowNum;
     }
 
     public String name() {
         return name;
     }
 
-    public boolean bounded() {
-        return bounded;
-    }
-
-    public Optional<String> getBeforeSql(Sink sink, String sinkPath) {
-        return Optional.ofNullable(beforeSql)
-                .map(sql -> getSql(sink.beforeSql() + "\n" + sql, sink, sinkPath));
-    }
-
-    public String getWriteBenchmarkSql(Sink sink, String sinkPath) {
-        return getSql(sink.beforeSql() + "\n" + querySql, sink, sinkPath);
+    public List<WriteSql> getWriteBenchmarkSql(Sink sink, String sinkPath) {
+        List<WriteSql> res = new ArrayList<>();
+        for (int i = 0; i < writeSqlName.size(); i++) {
+            res.add(
+                    new WriteSql(
+                            writeSqlName.get(i),
+                            getSql(sink.beforeSql() + "\n" + writeSqlText.get(i), sink, sinkPath),
+                            rowNum.get(i)));
+        }
+        return res;
     }
 
     public String getReadBenchmarkSql(Sink sink, String sinkPath) {
@@ -79,16 +78,18 @@ public class Query {
     }
 
     private String getSinkDdl(String tableName, String tableProperties) {
-        Matcher m = SINK_DDL_TEMPLATE_PATTERN.matcher(querySql);
-        if (!m.find()) {
-            throw new IllegalArgumentException(
-                    "Cannot find __SINK_DDL_BEGIN__ and __SINK_DDL_END__ in query "
-                            + name
-                            + ". This query is not valid.");
+        for (String s : writeSqlText) {
+            Matcher m = SINK_DDL_TEMPLATE_PATTERN.matcher(s);
+            if (m.find()) {
+                return m.group(1)
+                        .replace("${SINK_NAME}", tableName)
+                        .replace("${DDL_TEMPLATE}", tableProperties);
+            }
         }
-        return m.group(1)
-                .replace("${SINK_NAME}", tableName)
-                .replace("${DDL_TEMPLATE}", tableProperties);
+        throw new IllegalArgumentException(
+                "Cannot find __SINK_DDL_BEGIN__ and __SINK_DDL_END__ in query "
+                        + name
+                        + ". This query is not valid.");
     }
 
     private String getSql(String sqlTemplate, Sink sink, String sinkPath) {
@@ -109,20 +110,33 @@ public class Query {
         for (Map.Entry<?, ?> entry : yaml.entrySet()) {
             String name = (String) entry.getKey();
             Map<?, ?> queryMap = (Map<?, ?>) entry.getValue();
-            String beforeSqlFileName = (String) queryMap.get("before");
-            boolean bounded =
-                    queryMap.containsKey("bounded")
-                            && Boolean.parseBoolean(queryMap.get("bounded").toString());
-            result.add(
-                    new Query(
-                            name,
-                            beforeSqlFileName == null
-                                    ? null
-                                    : FileUtils.readFileUtf8(
-                                            queryLocation.resolve(beforeSqlFileName).toFile()),
-                            FileUtils.readFileUtf8(queryLocation.resolve(name + ".sql").toFile()),
-                            bounded));
+            List<String> writeSqlName = new ArrayList<>();
+            List<String> writeSqlText = new ArrayList<>();
+            for (Object filename : (List<?>) queryMap.get("sql")) {
+                writeSqlName.add(filename.toString());
+                writeSqlText.add(
+                        FileUtils.readFileUtf8(
+                                queryLocation.resolve(filename.toString()).toFile()));
+            }
+            List<Long> rowNum = new ArrayList<>();
+            for (Object rowNumStr : (List<?>) queryMap.get("row-num")) {
+                rowNum.add(Long.valueOf(rowNumStr.toString()));
+            }
+            result.add(new Query(name, writeSqlName, writeSqlText, rowNum));
         }
         return result;
     }
+
+    /** Sql for write benchmarks. */
+    public static class WriteSql {
+        public final String name;
+        public final String text;
+        public final long rowNum;
+
+        private WriteSql(String name, String text, long rowNum) {
+            this.name = name;
+            this.text = text;
+            this.rowNum = rowNum;
+        }
+    }
 }
diff --git a/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/QueryRunner.java b/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/QueryRunner.java
index 134d3491..7ae2605f 100644
--- a/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/QueryRunner.java
+++ b/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/QueryRunner.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.table.store.benchmark;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.table.store.benchmark.metric.FlinkRestClient;
 import org.apache.flink.table.store.benchmark.metric.JobBenchmarkMetric;
 import org.apache.flink.table.store.benchmark.metric.MetricReporter;
+import org.apache.flink.table.store.benchmark.metric.cpu.CpuMetricReceiver;
 import org.apache.flink.table.store.benchmark.utils.AutoClosableProcess;
 import org.apache.flink.table.store.benchmark.utils.BenchmarkGlobalConfiguration;
 import org.apache.flink.table.store.benchmark.utils.BenchmarkUtils;
@@ -31,9 +33,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.file.Path;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 import java.util.function.Consumer;
 
 /** Runner of a single benchmark query. */
@@ -44,23 +46,26 @@ public class QueryRunner {
     private final Query query;
     private final Sink sink;
     private final Path flinkDist;
-    private final MetricReporter metricReporter;
     private final FlinkRestClient flinkRestClient;
+    private final CpuMetricReceiver cpuMetricReceiver;
+    private final Configuration benchmarkConf;
 
     public QueryRunner(
             Query query,
             Sink sink,
             Path flinkDist,
-            MetricReporter metricReporter,
-            FlinkRestClient flinkRestClient) {
+            FlinkRestClient flinkRestClient,
+            CpuMetricReceiver cpuMetricReceiver,
+            Configuration benchmarkConf) {
         this.query = query;
         this.sink = sink;
         this.flinkDist = flinkDist;
-        this.metricReporter = metricReporter;
         this.flinkRestClient = flinkRestClient;
+        this.cpuMetricReceiver = cpuMetricReceiver;
+        this.benchmarkConf = benchmarkConf;
     }
 
-    public JobBenchmarkMetric run() {
+    public Result run() {
         try {
             BenchmarkUtils.printAndLog(
                     LOG,
@@ -70,34 +75,39 @@ public class QueryRunner {
                             + " with sink "
                             + sink.name());
 
-            String sinkPath =
+            String sinkPathConfig =
                     BenchmarkGlobalConfiguration.loadConfiguration()
                             .getString(BenchmarkOptions.SINK_PATH);
-            if (sinkPath == null) {
+            if (sinkPathConfig == null) {
                 throw new IllegalArgumentException(
                         BenchmarkOptions.SINK_PATH.key() + " must be set");
             }
 
             // before submitting SQL, clean sink path
-            FileSystem fs = new org.apache.flink.core.fs.Path(sinkPath).getFileSystem();
-            fs.delete(new org.apache.flink.core.fs.Path(sinkPath), true);
-
-            // run initialization SQL if needed
-            Optional<String> beforeSql = query.getBeforeSql(sink, sinkPath);
-            if (beforeSql.isPresent()) {
-                BenchmarkUtils.printAndLog(LOG, "Initializing query " + query.name());
-                String beforeJobId = submitSQLJob(beforeSql.get());
-                flinkRestClient.waitUntilJobFinished(beforeJobId);
-            }
-
-            String insertJobId = submitSQLJob(query.getWriteBenchmarkSql(sink, sinkPath));
-            // blocking until collect enough metrics
-            JobBenchmarkMetric metrics = metricReporter.reportMetric(insertJobId);
-            // cancel job
-            BenchmarkUtils.printAndLog(
-                    LOG, "Stop job query " + query.name() + " with sink " + sink.name());
-            if (flinkRestClient.isJobRunning(insertJobId)) {
-                flinkRestClient.cancelJob(insertJobId);
+            FileSystem fs = new org.apache.flink.core.fs.Path(sinkPathConfig).getFileSystem();
+            fs.delete(new org.apache.flink.core.fs.Path(sinkPathConfig), true);
+
+            String sinkPath = sinkPathConfig + "/data";
+            String savepointPath = sinkPathConfig + "/savepoint";
+
+            Duration monitorDelay = benchmarkConf.get(BenchmarkOptions.METRIC_MONITOR_DELAY);
+            Duration monitorInterval = benchmarkConf.get(BenchmarkOptions.METRIC_MONITOR_INTERVAL);
+
+            List<JobBenchmarkMetric> writeMetric = new ArrayList<>();
+            for (Query.WriteSql sql : query.getWriteBenchmarkSql(sink, sinkPath)) {
+                BenchmarkUtils.printAndLog(LOG, "Running SQL " + sql.name);
+                String jobId = submitSQLJob(sql.text);
+                MetricReporter metricReporter =
+                        new MetricReporter(
+                                flinkRestClient,
+                                cpuMetricReceiver,
+                                monitorDelay,
+                                monitorInterval,
+                                sql.rowNum);
+                JobBenchmarkMetric metric = metricReporter.reportMetric(sql.name, jobId);
+                writeMetric.add(metric);
+                flinkRestClient.stopJobWithSavepoint(jobId, savepointPath);
+                flinkRestClient.waitUntilJobFinished(jobId);
             }
 
             String queryJobId = submitSQLJob(query.getReadBenchmarkSql(sink, sinkPath));
@@ -105,11 +115,14 @@ public class QueryRunner {
             double numRecordsRead =
                     flinkRestClient.getTotalNumRecords(
                             queryJobId, flinkRestClient.getSourceVertexId(queryJobId));
-            metrics.setQueryRps(numRecordsRead / (queryJobRunMillis / 1000.0));
+            double scanRps = numRecordsRead / (queryJobRunMillis / 1000.0);
             System.out.println(
-                    "Scan RPS for query " + query.name() + " is " + metrics.getPrettyQueryRps());
+                    "Scan RPS for query "
+                            + query.name()
+                            + " is "
+                            + BenchmarkUtils.formatLongValue((long) scanRps));
 
-            return metrics;
+            return new Result(writeMetric, scanRps);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
@@ -150,6 +163,17 @@ public class QueryRunner {
         return jobId;
     }
 
+    /** Result metric of the current query. */
+    public static class Result {
+        public final List<JobBenchmarkMetric> writeMetric;
+        public final double scanRps;
+
+        private Result(List<JobBenchmarkMetric> writeMetric, double scanRps) {
+            this.writeMetric = writeMetric;
+            this.scanRps = scanRps;
+        }
+    }
+
     private static class SqlClientStdoutProcessor implements Consumer<String> {
 
         private String jobId = null;
diff --git a/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/metric/BenchmarkMetric.java b/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/metric/BenchmarkMetric.java
index 497d2525..85f4dbb9 100644
--- a/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/metric/BenchmarkMetric.java
+++ b/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/metric/BenchmarkMetric.java
@@ -22,40 +22,38 @@ import org.apache.flink.table.store.benchmark.utils.BenchmarkUtils;
 
 import javax.annotation.Nullable;
 
-import java.util.Objects;
-
 /**
  * Metric collected per {@link
  * org.apache.flink.table.store.benchmark.BenchmarkOptions#METRIC_MONITOR_DURATION} for a single
  * query.
  */
 public class BenchmarkMetric {
-    private final double bps;
-    private final long totalBytes;
+    private final double rps;
+    private final long totalRows;
     private final double cpu;
     @Nullable private final Long dataFreshness;
 
-    public BenchmarkMetric(double bps, long totalBytes, double cpu, @Nullable Long dataFreshness) {
-        this.bps = bps;
-        this.totalBytes = totalBytes;
+    public BenchmarkMetric(double rps, long totalRows, double cpu, @Nullable Long dataFreshness) {
+        this.rps = rps;
+        this.totalRows = totalRows;
         this.cpu = cpu;
         this.dataFreshness = dataFreshness;
     }
 
-    public double getBps() {
-        return bps;
+    public double getRps() {
+        return rps;
     }
 
-    public String getPrettyBps() {
-        return BenchmarkUtils.formatLongValue((long) bps);
+    public String getPrettyRps() {
+        return BenchmarkUtils.formatLongValue((long) rps);
     }
 
-    public long getTotalBytes() {
-        return totalBytes;
+    public long getTotalRows() {
+        return totalRows;
     }
 
-    public String getPrettyTotalBytes() {
-        return BenchmarkUtils.formatLongValue(totalBytes);
+    public String getPrettyTotalRows() {
+        return BenchmarkUtils.formatLongValue(totalRows);
     }
 
     public double getCpu() {
@@ -74,26 +72,4 @@ public class BenchmarkMetric {
     public String getDataFreshnessString() {
         return BenchmarkUtils.formatDataFreshness(dataFreshness);
     }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        BenchmarkMetric that = (BenchmarkMetric) o;
-        return Double.compare(that.bps, bps) == 0 && Double.compare(that.cpu, cpu) == 0;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(bps, cpu);
-    }
-
-    @Override
-    public String toString() {
-        return "BenchmarkMetric{" + "bps=" + bps + ", cpu=" + cpu + '}';
-    }
 }
diff --git a/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/metric/FlinkRestClient.java b/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/metric/FlinkRestClient.java
index af387df4..73996324 100644
--- a/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/metric/FlinkRestClient.java
+++ b/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/metric/FlinkRestClient.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.store.benchmark.metric;
 
-import org.apache.flink.table.store.benchmark.metric.bytes.BpsMetric;
-import org.apache.flink.table.store.benchmark.metric.bytes.TotalBytesMetric;
 import org.apache.flink.table.store.benchmark.utils.BenchmarkUtils;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -32,7 +30,9 @@ import org.apache.http.HttpStatus;
 import org.apache.http.client.config.RequestConfig;
 import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.methods.HttpRequestBase;
+import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.net.URI;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -85,11 +86,17 @@ public class FlinkRestClient {
     }
 
     public void cancelJob(String jobId) {
-        LOG.info("Stopping Job: {}", jobId);
+        LOG.info("Cancelling Job: {}", jobId);
         String url = String.format("http://%s/jobs/%s?mode=cancel", jmEndpoint, jobId);
         patch(url);
     }
 
+    public void stopJobWithSavepoint(String jobId, String savepointPath) {
+        LOG.info("Stopping Job: {}", jobId);
+        String url = String.format("http://%s/jobs/%s/stop", jmEndpoint, jobId);
+        post(url, "{\"targetDirectory\": \"" + savepointPath + "\", \"drain\": false}");
+    }
+
     public boolean isJobRunning(String jobId) {
         String url = String.format("http://%s/jobs/%s", jmEndpoint, jobId);
         String response = executeAsString(url);
@@ -101,6 +108,21 @@ public class FlinkRestClient {
         }
     }
 
+    public void waitUntilNumberOfRows(String jobId, long numberOfRows) {
+        while (true) {
+            String sourceVertexId = getSourceVertexId(jobId);
+            double actualNumRecords = getTotalNumRecords(jobId, sourceVertexId);
+            if (actualNumRecords >= numberOfRows) {
+                return;
+            }
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                // ignore
+            }
+        }
+    }
+
     public long waitUntilJobFinished(String jobId) {
         while (true) {
             String url = String.format("http://%s/jobs/%s", jmEndpoint, jobId);
@@ -139,24 +161,6 @@ public class FlinkRestClient {
         }
     }
 
-    public synchronized BpsMetric getBpsMetric(String jobId, String vertexId) {
-        String url =
-                String.format(
-                        "http://%s/jobs/%s/vertices/%s/subtasks/metrics?get=numBytesOutPerSecond",
-                        jmEndpoint, jobId, vertexId);
-        String response = executeAsString(url);
-        return BpsMetric.fromJson(response);
-    }
-
-    public synchronized TotalBytesMetric getTotalBytesMetric(String jobId, String vertexId) {
-        String url =
-                String.format(
-                        "http://%s/jobs/%s/vertices/%s/subtasks/metrics?get=numBytesOut",
-                        jmEndpoint, jobId, vertexId);
-        String response = executeAsString(url);
-        return TotalBytesMetric.fromJson(response);
-    }
-
     @Nullable
     public synchronized Long getDataFreshness(String jobId) {
         String url = String.format("http://%s/jobs/%s/checkpoints", jmEndpoint, jobId);
@@ -174,7 +178,15 @@ public class FlinkRestClient {
         }
     }
 
+    public synchronized double getNumRecordsPerSecond(String jobId, String vertexId) {
+        return getSourceMetric(jobId, vertexId, "numRecordsOutPerSecond");
+    }
+
     public synchronized double getTotalNumRecords(String jobId, String vertexId) {
+        return getSourceMetric(jobId, vertexId, "numRecordsOut");
+    }
+
+    public synchronized double getSourceMetric(String jobId, String vertexId, String metric) {
         String url =
                 String.format(
                         "http://%s/jobs/%s/vertices/%s/subtasks/metrics",
@@ -185,7 +197,7 @@ public class FlinkRestClient {
             ArrayNode arrayNode = (ArrayNode) BenchmarkUtils.JSON_MAPPER.readTree(response);
             for (JsonNode node : arrayNode) {
                 String metricName = node.get("id").asText();
-                if (metricName.startsWith("Source_") && metricName.endsWith(".numRecordsOut")) {
+                if (metricName.startsWith("Source_") && metricName.endsWith("." + metric)) {
                     numRecordsMetricName = metricName;
                     break;
                 }
@@ -195,8 +207,8 @@ public class FlinkRestClient {
         }
 
         if (numRecordsMetricName == null) {
-            throw new RuntimeException(
-                    "Can't find number of records metric name from the response:\n" + response);
+            LOG.warn("Can't find " + metric + " name from the response: " + response);
+            return -1;
         }
 
         url =
@@ -211,6 +223,29 @@ public class FlinkRestClient {
         }
     }
 
+    private void post(String url, String json) {
+        HttpPost httpPost = new HttpPost();
+        httpPost.setURI(URI.create(url));
+        try {
+            httpPost.setEntity(new StringEntity(json));
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+        HttpResponse response;
+        try {
+            httpPost.setHeader("Connection", "close");
+            response = httpClient.execute(httpPost);
+            int httpCode = response.getStatusLine().getStatusCode();
+            if (httpCode != HttpStatus.SC_ACCEPTED) {
+                String msg = String.format("http execute failed,status code is %d", httpCode);
+                throw new RuntimeException(msg);
+            }
+        } catch (Exception e) {
+            httpPost.abort();
+            throw new RuntimeException(e);
+        }
+    }
+
     private void patch(String url) {
         HttpPatch httpPatch = new HttpPatch();
         httpPatch.setURI(URI.create(url));
diff --git a/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/metric/JobBenchmarkMetric.java b/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/metric/JobBenchmarkMetric.java
index 8ba0f7a2..31572b62 100644
--- a/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/metric/JobBenchmarkMetric.java
+++ b/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/metric/JobBenchmarkMetric.java
@@ -20,32 +20,41 @@ package org.apache.flink.table.store.benchmark.metric;
 
 import org.apache.flink.table.store.benchmark.utils.BenchmarkUtils;
 
-import java.util.Objects;
-
 /** The aggregated result of a single benchmark query. */
 public class JobBenchmarkMetric {
-    private final double bps;
-    private final long totalBytes;
+
+    private final String name;
+    private final double rps;
+    private final long totalRows;
     private final double cpu;
     private final Long avgDataFreshness;
     private final Long maxDataFreshness;
-    private double queryRps;
 
     public JobBenchmarkMetric(
-            double bps, long totalBytes, double cpu, Long avgDataFreshness, Long maxDataFreshness) {
-        this.bps = bps;
-        this.totalBytes = totalBytes;
+            String name,
+            double rps,
+            long totalRows,
+            double cpu,
+            Long avgDataFreshness,
+            Long maxDataFreshness) {
+        this.name = name;
+        this.rps = rps;
+        this.totalRows = totalRows;
         this.cpu = cpu;
         this.avgDataFreshness = avgDataFreshness;
         this.maxDataFreshness = maxDataFreshness;
     }
 
-    public String getPrettyBps() {
-        return BenchmarkUtils.formatLongValue((long) bps);
+    public String getName() {
+        return name;
+    }
+
+    public String getPrettyRps() {
+        return BenchmarkUtils.formatLongValue((long) rps);
     }
 
-    public String getPrettyTotalBytes() {
-        return BenchmarkUtils.formatLongValue(totalBytes);
+    public String getPrettyTotalRows() {
+        return BenchmarkUtils.formatLongValue(totalRows);
     }
 
     public String getPrettyCpu() {
@@ -56,12 +65,12 @@ public class JobBenchmarkMetric {
         return cpu;
     }
 
-    public String getPrettyBpsPerCore() {
-        return BenchmarkUtils.formatLongValue(getBpsPerCore());
+    public String getPrettyRpsPerCore() {
+        return BenchmarkUtils.formatLongValue(getRpsPerCore());
     }
 
-    public long getBpsPerCore() {
-        return (long) (bps / cpu);
+    public long getRpsPerCore() {
+        return (long) (rps / cpu);
     }
 
     public String getAvgDataFreshnessString() {
@@ -71,34 +80,4 @@ public class JobBenchmarkMetric {
     public String getMaxDataFreshnessString() {
         return BenchmarkUtils.formatDataFreshness(maxDataFreshness);
     }
-
-    public void setQueryRps(double queryRps) {
-        this.queryRps = queryRps;
-    }
-
-    public String getPrettyQueryRps() {
-        return BenchmarkUtils.formatLongValue((long) queryRps);
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        JobBenchmarkMetric that = (JobBenchmarkMetric) o;
-        return Double.compare(that.bps, bps) == 0 && Double.compare(that.cpu, cpu) == 0;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(bps, cpu);
-    }
-
-    @Override
-    public String toString() {
-        return "BenchmarkMetric{" + "bps=" + bps + ", cpu=" + cpu + '}';
-    }
 }
diff --git a/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/metric/MetricReporter.java b/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/metric/MetricReporter.java
index f0088750..53222bfa 100644
--- a/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/metric/MetricReporter.java
+++ b/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/metric/MetricReporter.java
@@ -19,8 +19,6 @@
 package org.apache.flink.table.store.benchmark.metric;
 
 import org.apache.flink.api.common.time.Deadline;
-import org.apache.flink.table.store.benchmark.metric.bytes.BpsMetric;
-import org.apache.flink.table.store.benchmark.metric.bytes.TotalBytesMetric;
 import org.apache.flink.table.store.benchmark.metric.cpu.CpuMetricReceiver;
 
 import org.slf4j.Logger;
@@ -40,7 +38,8 @@ public class MetricReporter {
 
     private final Duration monitorDelay;
     private final Duration monitorInterval;
-    private final Duration monitorDuration;
+    private final Long monitorRowNumber;
+
     private final FlinkRestClient flinkRestClient;
     private final CpuMetricReceiver cpuMetricReceiver;
     private final List<BenchmarkMetric> metrics;
@@ -52,10 +51,11 @@ public class MetricReporter {
             CpuMetricReceiver cpuMetricReceiver,
             Duration monitorDelay,
             Duration monitorInterval,
-            Duration monitorDuration) {
+            long monitorRowNumber) {
         this.monitorDelay = monitorDelay;
         this.monitorInterval = monitorInterval;
-        this.monitorDuration = monitorDuration;
+        this.monitorRowNumber = monitorRowNumber;
+
         this.flinkRestClient = flinkRestClient;
         this.cpuMetricReceiver = cpuMetricReceiver;
         this.metrics = new ArrayList<>();
@@ -84,19 +84,13 @@ public class MetricReporter {
         }
     }
 
-    public JobBenchmarkMetric reportMetric(String jobId) {
+    public JobBenchmarkMetric reportMetric(String name, String jobId) {
         System.out.printf("Monitor metrics after %s seconds.%n", monitorDelay.getSeconds());
         waitFor(monitorDelay);
 
         submitMonitorThread(jobId);
-        if (monitorDuration != null) {
-            System.out.printf(
-                    "Start to monitor metrics for %s seconds.%n", monitorDuration.getSeconds());
-            waitFor(monitorDuration);
-        } else {
-            System.out.println("Start to monitor metrics until job ended.%n");
-            flinkRestClient.waitUntilJobFinished(jobId);
-        }
+        System.out.printf("Start to monitor metrics for %d rows.%n", monitorRowNumber);
+        flinkRestClient.waitUntilNumberOfRows(jobId, monitorRowNumber);
 
         // cleanup the resource
         this.close();
@@ -104,8 +98,8 @@ public class MetricReporter {
         if (metrics.isEmpty()) {
             throw new RuntimeException("The metric reporter doesn't collect any metrics.");
         }
-        double sumBps = 0.0;
-        long totalBytes = 0;
+        double sumRps = 0.0;
+        long totalRows = 0;
         double sumCpu = 0.0;
 
         Long avgDataFreshness = 0L;
@@ -113,8 +107,8 @@ public class MetricReporter {
         int validDataFreshnessCount = 0;
 
         for (BenchmarkMetric metric : metrics) {
-            sumBps += metric.getBps();
-            totalBytes = metric.getTotalBytes();
+            sumRps += metric.getRps();
+            totalRows = metric.getTotalRows();
             sumCpu += metric.getCpu();
             if (metric.getDataFreshness() != null) {
                 avgDataFreshness += metric.getDataFreshness();
@@ -123,7 +117,7 @@ public class MetricReporter {
             }
         }
 
-        double avgBps = sumBps / metrics.size();
+        double avgRps = sumRps / metrics.size();
         double avgCpu = sumCpu / metrics.size();
         if (validDataFreshnessCount == 0) {
             avgDataFreshness = null;
@@ -133,17 +127,19 @@ public class MetricReporter {
         }
         JobBenchmarkMetric metric =
                 new JobBenchmarkMetric(
-                        avgBps, totalBytes, avgCpu, avgDataFreshness, maxDataFreshness);
+                        name, avgRps, totalRows, avgCpu, avgDataFreshness, maxDataFreshness);
 
         String message =
                 String.format(
-                        "Summary: Average Throughput = %s, "
-                                + "Total Bytes = %s, "
+                        "Summary: Name = %s, "
+                                + "Average Throughput = %s, "
+                                + "Total Rows = %s, "
                                 + "Cores = %s, "
                                 + "Avg Data Freshness = %s, "
                                 + "Max Data Freshness = %s",
-                        metric.getPrettyBps(),
-                        metric.getPrettyTotalBytes(),
+                        name,
+                        metric.getPrettyRps(),
+                        metric.getPrettyTotalRows(),
                         metric.getPrettyCpu(),
                         metric.getAvgDataFreshnessString(),
                         metric.getMaxDataFreshnessString());
@@ -169,21 +165,26 @@ public class MetricReporter {
         @Override
         public void run() {
             try {
-                BpsMetric bps = flinkRestClient.getBpsMetric(jobId, vertexId);
-                TotalBytesMetric totalBytes = flinkRestClient.getTotalBytesMetric(jobId, vertexId);
+                String sourceVertexId = flinkRestClient.getSourceVertexId(jobId);
+                double tps = flinkRestClient.getNumRecordsPerSecond(jobId, sourceVertexId);
+                double totalRows = flinkRestClient.getTotalNumRecords(jobId, sourceVertexId);
                 Long dataFreshness = flinkRestClient.getDataFreshness(jobId);
                 double cpu = cpuMetricReceiver.getTotalCpu();
                 int tms = cpuMetricReceiver.getNumberOfTM();
                 BenchmarkMetric metric =
-                        new BenchmarkMetric(bps.getSum(), totalBytes.getSum(), cpu, dataFreshness);
-                // it's thread-safe to update metrics
-                metrics.add(metric);
+                        new BenchmarkMetric(tps, (long) totalRows, cpu, dataFreshness);
+
+                if (tps >= 0 && totalRows >= 0) {
+                    // it's thread-safe to update metrics
+                    metrics.add(metric);
+                }
+
                 // logging
                 String message =
                         String.format(
-                                "Current Throughput = %s, Total Bytes = %s, Cores = %s (%s TMs), Data Freshness = %s",
-                                metric.getPrettyBps(),
-                                metric.getPrettyTotalBytes(),
+                                "Current Throughput = %s, Total Rows = %s, Cores = %s (%s TMs), Data Freshness = %s",
+                                metric.getPrettyRps(),
+                                metric.getPrettyTotalRows(),
                                 metric.getPrettyCpu(),
                                 tms,
                                 metric.getDataFreshnessString());
diff --git a/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/metric/bytes/BpsMetric.java b/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/metric/bytes/BpsMetric.java
deleted file mode 100644
index e27c0097..00000000
--- a/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/metric/bytes/BpsMetric.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.benchmark.metric.bytes;
-
-import org.apache.flink.table.store.benchmark.utils.BenchmarkUtils;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-
-import javax.annotation.Nullable;
-
-import java.util.Objects;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Response type for BPS aggregated metrics. Contains the metric name and optionally the sum,
- * average, minimum and maximum.
- */
-public class BpsMetric {
-
-    private static final String FIELD_NAME_ID = "id";
-
-    private static final String FIELD_NAME_MIN = "min";
-
-    private static final String FIELD_NAME_MAX = "max";
-
-    private static final String FIELD_NAME_AVG = "avg";
-
-    private static final String FIELD_NAME_SUM = "sum";
-
-    @JsonProperty(value = FIELD_NAME_ID, required = true)
-    private final String id;
-
-    @JsonInclude(JsonInclude.Include.NON_NULL)
-    @JsonProperty(FIELD_NAME_MIN)
-    private final Double min;
-
-    @JsonInclude(JsonInclude.Include.NON_NULL)
-    @JsonProperty(FIELD_NAME_MAX)
-    private final Double max;
-
-    @JsonInclude(JsonInclude.Include.NON_NULL)
-    @JsonProperty(FIELD_NAME_AVG)
-    private final Double avg;
-
-    @JsonInclude(JsonInclude.Include.NON_NULL)
-    @JsonProperty(FIELD_NAME_SUM)
-    private final Double sum;
-
-    @JsonCreator
-    public BpsMetric(
-            final @JsonProperty(value = FIELD_NAME_ID, required = true) String id,
-            final @Nullable @JsonProperty(FIELD_NAME_MIN) Double min,
-            final @Nullable @JsonProperty(FIELD_NAME_MAX) Double max,
-            final @Nullable @JsonProperty(FIELD_NAME_AVG) Double avg,
-            final @Nullable @JsonProperty(FIELD_NAME_SUM) Double sum) {
-
-        this.id = requireNonNull(id, "id must not be null");
-        this.min = min;
-        this.max = max;
-        this.avg = avg;
-        this.sum = sum;
-    }
-
-    public BpsMetric(final @JsonProperty(value = FIELD_NAME_ID, required = true) String id) {
-        this(id, null, null, null, null);
-    }
-
-    @JsonIgnore
-    public String getId() {
-        return id;
-    }
-
-    @JsonIgnore
-    public Double getMin() {
-        return min;
-    }
-
-    @JsonIgnore
-    public Double getMax() {
-        return max;
-    }
-
-    @JsonIgnore
-    public Double getSum() {
-        return sum;
-    }
-
-    @JsonIgnore
-    public Double getAvg() {
-        return avg;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        BpsMetric that = (BpsMetric) o;
-        return Objects.equals(id, that.id)
-                && Objects.equals(min, that.min)
-                && Objects.equals(max, that.max)
-                && Objects.equals(avg, that.avg)
-                && Objects.equals(sum, that.sum);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(id, min, max, avg, sum);
-    }
-
-    @Override
-    public String toString() {
-        return "BpsMetric{"
-                + "id='"
-                + id
-                + '\''
-                + ", mim='"
-                + min
-                + '\''
-                + ", max='"
-                + max
-                + '\''
-                + ", avg='"
-                + avg
-                + '\''
-                + ", sum='"
-                + sum
-                + '\''
-                + '}';
-    }
-
-    public static BpsMetric fromJson(String json) {
-        try {
-            JsonNode jsonNode = BenchmarkUtils.JSON_MAPPER.readTree(json);
-            return BenchmarkUtils.JSON_MAPPER.convertValue(jsonNode.get(0), BpsMetric.class);
-        } catch (JsonProcessingException e) {
-            throw new RuntimeException(e);
-        }
-    }
-}
diff --git a/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/metric/bytes/TotalBytesMetric.java b/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/metric/bytes/TotalBytesMetric.java
deleted file mode 100644
index 4212507d..00000000
--- a/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/metric/bytes/TotalBytesMetric.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.store.benchmark.metric.bytes;
-
-import org.apache.flink.table.store.benchmark.utils.BenchmarkUtils;
-
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
-
-import javax.annotation.Nullable;
-
-import java.util.Objects;
-
-import static java.util.Objects.requireNonNull;
-
-/** Json object of the "numBytesOut" metrics of Flink. */
-public class TotalBytesMetric {
-
-    private static final String FIELD_NAME_ID = "id";
-
-    private static final String FIELD_NAME_MIN = "min";
-
-    private static final String FIELD_NAME_MAX = "max";
-
-    private static final String FIELD_NAME_AVG = "avg";
-
-    private static final String FIELD_NAME_SUM = "sum";
-
-    @JsonProperty(value = FIELD_NAME_ID, required = true)
-    private final String id;
-
-    @JsonInclude(JsonInclude.Include.NON_NULL)
-    @JsonProperty(FIELD_NAME_MIN)
-    private final Long min;
-
-    @JsonInclude(JsonInclude.Include.NON_NULL)
-    @JsonProperty(FIELD_NAME_MAX)
-    private final Long max;
-
-    @JsonInclude(JsonInclude.Include.NON_NULL)
-    @JsonProperty(FIELD_NAME_AVG)
-    private final Long avg;
-
-    @JsonInclude(JsonInclude.Include.NON_NULL)
-    @JsonProperty(FIELD_NAME_SUM)
-    private final Long sum;
-
-    @JsonCreator
-    public TotalBytesMetric(
-            final @JsonProperty(value = FIELD_NAME_ID, required = true) String id,
-            final @Nullable @JsonProperty(FIELD_NAME_MIN) Long min,
-            final @Nullable @JsonProperty(FIELD_NAME_MAX) Long max,
-            final @Nullable @JsonProperty(FIELD_NAME_AVG) Long avg,
-            final @Nullable @JsonProperty(FIELD_NAME_SUM) Long sum) {
-
-        this.id = requireNonNull(id, "id must not be null");
-        this.min = min;
-        this.max = max;
-        this.avg = avg;
-        this.sum = sum;
-    }
-
-    public TotalBytesMetric(final @JsonProperty(value = FIELD_NAME_ID, required = true) String id) {
-        this(id, null, null, null, null);
-    }
-
-    @JsonIgnore
-    public String getId() {
-        return id;
-    }
-
-    @JsonIgnore
-    public Long getMin() {
-        return min;
-    }
-
-    @JsonIgnore
-    public Long getMax() {
-        return max;
-    }
-
-    @JsonIgnore
-    public Long getSum() {
-        return sum;
-    }
-
-    @JsonIgnore
-    public Long getAvg() {
-        return avg;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        TotalBytesMetric that = (TotalBytesMetric) o;
-        return Objects.equals(id, that.id)
-                && Objects.equals(min, that.min)
-                && Objects.equals(max, that.max)
-                && Objects.equals(avg, that.avg)
-                && Objects.equals(sum, that.sum);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(id, min, max, avg, sum);
-    }
-
-    @Override
-    public String toString() {
-        return "TotalBytesMetric{"
-                + "id='"
-                + id
-                + '\''
-                + ", mim='"
-                + min
-                + '\''
-                + ", max='"
-                + max
-                + '\''
-                + ", avg='"
-                + avg
-                + '\''
-                + ", sum='"
-                + sum
-                + '\''
-                + '}';
-    }
-
-    public static TotalBytesMetric fromJson(String json) {
-        try {
-            JsonNode jsonNode = BenchmarkUtils.JSON_MAPPER.readTree(json);
-            return BenchmarkUtils.JSON_MAPPER.convertValue(jsonNode.get(0), TotalBytesMetric.class);
-        } catch (JsonProcessingException e) {
-            throw new RuntimeException(e);
-        }
-    }
-}
diff --git a/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/utils/BenchmarkGlobalConfiguration.java b/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/utils/BenchmarkGlobalConfiguration.java
index b2cad9c8..fc36f9d3 100644
--- a/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/utils/BenchmarkGlobalConfiguration.java
+++ b/flink-table-store-benchmark/src/main/java/org/apache/flink/table/store/benchmark/utils/BenchmarkGlobalConfiguration.java
@@ -128,11 +128,12 @@ public final class BenchmarkGlobalConfiguration {
                             + ") does not exist.");
         }
 
-        Configuration configuration;
+        Configuration configuration = new Configuration();
         try (InputStream inputStream = new FileInputStream(yamlConfigFile)) {
-            configuration =
-                    Configuration.fromMap(
-                            BenchmarkUtils.YAML_MAPPER.readValue(inputStream, Map.class));
+            Map<?, ?> yaml = BenchmarkUtils.YAML_MAPPER.readValue(inputStream, Map.class);
+            for (Map.Entry<?, ?> entry : yaml.entrySet()) {
+                configuration.setString(entry.getKey().toString(), entry.getValue().toString());
+            }
         } catch (IOException e) {
             throw new RuntimeException("Error parsing YAML configuration.", e);
         }
diff --git a/flink-table-store-benchmark/src/main/resources/conf/benchmark.yaml b/flink-table-store-benchmark/src/main/resources/conf/benchmark.yaml
index 29771035..e42e89c1 100644
--- a/flink-table-store-benchmark/src/main/resources/conf/benchmark.yaml
+++ b/flink-table-store-benchmark/src/main/resources/conf/benchmark.yaml
@@ -16,8 +16,7 @@
 # limitations under the License.
 ################################################################################
 
-benchmark.metric.monitor.delay: 60s
-benchmark.metric.monitor.duration: 30min
+benchmark.metric.monitor.delay: 30s
 benchmark.metric.monitor.interval: 10s
 # benchmark.metric.reporter.host: host-name
 benchmark.sink.path: /path/to/sink # should point to a non-existing path, will be removed before each test
diff --git a/flink-table-store-benchmark/src/main/resources/queries/q1.sql b/flink-table-store-benchmark/src/main/resources/queries/q1.sql
index fa2b3a15..3c9ff963 100644
--- a/flink-table-store-benchmark/src/main/resources/queries/q1.sql
+++ b/flink-table-store-benchmark/src/main/resources/queries/q1.sql
@@ -15,7 +15,7 @@
 -- limitations under the License.
 
 -- Mimics the update of uv and pv of items in an E-commercial website.
--- Primary keys are totally random; Each record is about 100 bytes.
+-- Primary keys ranges from 0 to 10^8; Each record is about 100 bytes.
 
 CREATE TABLE item_uv_pv_1d_source (
     `item_id` BIGINT,
@@ -32,7 +32,7 @@ CREATE TABLE item_uv_pv_1d_source (
     'connector' = 'datagen',
     'rows-per-second' = '999999999',
     'fields.item_id.min' = '0',
-    'fields.item_id.max' = '999999999',
+    'fields.item_id.max' = '99999999',
     'fields.item_click_uv_1d.min' = '0',
     'fields.item_click_uv_1d.max' = '999999999',
     'fields.item_click_pv_1d.min' = '0',
diff --git a/flink-table-store-benchmark/src/main/resources/queries/q2.sql b/flink-table-store-benchmark/src/main/resources/queries/q2.sql
deleted file mode 100644
index aa850df6..00000000
--- a/flink-table-store-benchmark/src/main/resources/queries/q2.sql
+++ /dev/null
@@ -1,179 +0,0 @@
--- Licensed to the Apache Software Foundation (ASF) under one
--- or more contributor license agreements.  See the NOTICE file
--- distributed with this work for additional information
--- regarding copyright ownership.  The ASF licenses this file
--- to you under the Apache License, Version 2.0 (the
--- "License"); you may not use this file except in compliance
--- with the License.  You may obtain a copy of the License at
---
---     http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
-
--- Mimics the insert and update of orders in an E-commercial website.
--- Primary keys are totally random; Each record is about 1.5K bytes.
-
-CREATE TABLE orders_source (
-    `order_id` BIGINT, -- primary key
-    `category_id` BIGINT,
-    `category_name` STRING,
-    `category_description` STRING,
-    `item_id` BIGINT,
-    `item_name` STRING,
-    `item_description` STRING,
-    `item_price` INT,
-    `item_currency` STRING,
-    `item_tags` STRING,
-    `item_amount` INT,
-    `item_updatetime` TIMESTAMP(3),
-    `buyer_id` BIGINT,
-    `buyer_name` STRING,
-    `buyer_avatar` STRING,
-    `buyer_age` INT,
-    `buyer_company` STRING,
-    `buyer_email` STRING,
-    `buyer_phone` BIGINT,
-    `buyer_country` STRING,
-    `buyer_city` STRING,
-    `buyer_street` STRING,
-    `buyer_zipcode` INT,
-    `buyer_comments` STRING,
-    `buyer_buytime` TIMESTAMP(3),
-    `seller_id` BIGINT,
-    `seller_name` STRING,
-    `seller_avatar` STRING,
-    `seller_age` INT,
-    `seller_company` STRING,
-    `seller_email` STRING,
-    `seller_phone` BIGINT,
-    `seller_country` STRING,
-    `seller_city` STRING,
-    `seller_street` STRING,
-    `seller_zipcode` INT,
-    `seller_comments` STRING,
-    `seller_selltime` TIMESTAMP(3),
-    `shipping_id` BIGINT,
-    `shipping_company` STRING,
-    `shipping_status` INT,
-    `shipping_comments` STRING
-) WITH (
-    'connector' = 'datagen',
-    'rows-per-second' = '999999999',
-    'fields.order_id.min' = '0',
-    'fields.order_id.max' = '999999999',
-    'fields.item_currency.length' = '3',
-    'fields.buyer_phone.min' = '10000000000',
-    'fields.buyer_phone.max' = '99999999999',
-    'fields.buyer_country.length' = '3',
-    'fields.seller_phone.min' = '10000000000',
-    'fields.seller_phone.max' = '99999999999',
-    'fields.seller_country.length' = '3',
-    'fields.shipping_status.min' = '0',
-    'fields.shipping_status.max' = '9'
-);
-
-CREATE VIEW orders AS
-SELECT
-    `order_id`,
-    `category_id`,
-    SUBSTR(`category_name`, 0, ABS(MOD(`category_id`, 64)) + 64) AS `category_name`,
-    SUBSTR(`category_description`, 0, ABS(MOD(`category_id`, 128)) + 128) AS `category_description`,
-    `item_id`,
-    SUBSTR(`item_name`, 0, ABS(MOD(`item_id`, 64)) + 64) AS `item_name`,
-    SUBSTR(`item_description`, 0, ABS(MOD(`item_id`, 128)) + 128) AS `item_description`,
-    `item_price`,
-    `item_currency`,
-    SUBSTR(`item_tags`, 0, ABS(MOD(`item_id`, 32)) + 32) AS `item_tags`,
-    `item_amount`,
-    `item_updatetime`,
-    `buyer_id`,
-    SUBSTR(`buyer_name`, 0, ABS(MOD(`buyer_id`, 16)) + 16) AS `buyer_name`,
-    SUBSTR(`buyer_avatar`, 0, ABS(MOD(`buyer_id`, 128)) + 128) AS `buyer_avatar`,
-    `buyer_age`,
-    SUBSTR(`buyer_company`, 0, ABS(MOD(`buyer_id`, 32)) + 32) AS `buyer_company`,
-    SUBSTR(`buyer_email`, 0, ABS(MOD(`buyer_id`, 16)) + 16) AS `buyer_email`,
-    `buyer_phone`,
-    `buyer_country`,
-    SUBSTR(`buyer_city`, 0, ABS(MOD(`buyer_id`, 8)) + 8) AS `buyer_city`,
-    SUBSTR(`buyer_street`, 0, ABS(MOD(`buyer_id`, 16)) + 32) AS `buyer_street`,
-    `buyer_zipcode`,
-    SUBSTR(`buyer_comments`, 0, ABS(MOD(`buyer_id`, 128)) + 128) AS `buyer_street`,
-    `buyer_buytime`,
-    `seller_id`,
-    SUBSTR(`seller_name`, 0, ABS(MOD(`seller_id`, 16)) + 16) AS `seller_name`,
-    SUBSTR(`seller_avatar`, 0, ABS(MOD(`seller_id`, 128)) + 128) AS `seller_avatar`,
-    `seller_age`,
-    SUBSTR(`seller_company`, 0, ABS(MOD(`seller_id`, 32)) + 32) AS `seller_company`,
-    SUBSTR(`seller_email`, 0, ABS(MOD(`seller_id`, 16)) + 16) AS `seller_email`,
-    `seller_phone`,
-    `seller_country`,
-    SUBSTR(`seller_city`, 0, ABS(MOD(`seller_id`, 8)) + 8) AS `seller_city`,
-    SUBSTR(`seller_street`, 0, ABS(MOD(`seller_id`, 16)) + 32) AS `seller_street`,
-    `seller_zipcode`,
-    SUBSTR(`seller_comments`, 0, ABS(MOD(`seller_id`, 128)) + 128) AS `seller_comments`,
-    `seller_selltime`,
-    `shipping_id`,
-    SUBSTR(`shipping_company`, 0, ABS(MOD(`shipping_id`, 32)) + 32) AS `shipping_company`,
-    `shipping_status`,
-    SUBSTR(`shipping_comments`, 0, ABS(MOD(`shipping_id`, 128)) + 128) AS `shipping_comments`,
-    NOW() AS `ts`
-FROM orders_source;
-
--- __SINK_DDL_BEGIN__
-
-CREATE TABLE IF NOT EXISTS ${SINK_NAME} (
-    `order_id` BIGINT, -- primary key
-    `category_id` BIGINT,
-    `category_name` STRING,
-    `category_description` STRING,
-    `item_id` BIGINT,
-    `item_name` STRING,
-    `item_description` STRING,
-    `item_price` INT,
-    `item_currency` STRING,
-    `item_tags` STRING,
-    `item_amount` INT,
-    `item_updatetime` TIMESTAMP(3),
-    `buyer_id` BIGINT,
-    `buyer_name` STRING,
-    `buyer_avatar` STRING,
-    `buyer_age` INT,
-    `buyer_company` STRING,
-    `buyer_email` STRING,
-    `buyer_phone` BIGINT,
-    `buyer_country` STRING,
-    `buyer_city` STRING,
-    `buyer_street` STRING,
-    `buyer_zipcode` INT,
-    `buyer_comments` STRING,
-    `buyer_buytime` TIMESTAMP(3),
-    `seller_id` BIGINT,
-    `seller_name` STRING,
-    `seller_avatar` STRING,
-    `seller_age` INT,
-    `seller_company` STRING,
-    `seller_email` STRING,
-    `seller_phone` BIGINT,
-    `seller_country` STRING,
-    `seller_city` STRING,
-    `seller_street` STRING,
-    `seller_zipcode` INT,
-    `seller_comments` STRING,
-    `seller_selltime` TIMESTAMP(3),
-    `shipping_id` BIGINT,
-    `shipping_company` STRING,
-    `shipping_status` INT,
-    `shipping_comments` STRING,
-    `ts` TIMESTAMP(3),
-    PRIMARY KEY (`order_id`) NOT ENFORCED
-) WITH (
-    ${DDL_TEMPLATE}
-);
-
--- __SINK_DDL_END__
-
-INSERT INTO ${SINK_NAME} SELECT * FROM orders;
diff --git a/flink-table-store-benchmark/src/main/resources/queries/q3.sql b/flink-table-store-benchmark/src/main/resources/queries/q3.sql
deleted file mode 100644
index 5095e2cc..00000000
--- a/flink-table-store-benchmark/src/main/resources/queries/q3.sql
+++ /dev/null
@@ -1,92 +0,0 @@
--- Licensed to the Apache Software Foundation (ASF) under one
--- or more contributor license agreements.  See the NOTICE file
--- distributed with this work for additional information
--- regarding copyright ownership.  The ASF licenses this file
--- to you under the Apache License, Version 2.0 (the
--- "License"); you may not use this file except in compliance
--- with the License.  You may obtain a copy of the License at
---
---     http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
-
--- Mimics the update of uv and pv by hour of items in an E-commercial website.
--- Primary keys are related with real time; Each record is about 100 bytes.
-
-CREATE TABLE item_uv_pv_hh_source (
-    `item_id` BIGINT,
-    `item_name` STRING,
-    `item_click_uv_hh` BIGINT,
-    `item_click_pv_hh` BIGINT,
-    `item_like_uv_hh` BIGINT,
-    `item_like_pv_hh` BIGINT,
-    `item_cart_uv_hh` BIGINT,
-    `item_cart_pv_hh` BIGINT,
-    `item_share_uv_hh` BIGINT,
-    `item_share_pv_hh` BIGINT
-) WITH (
-    'connector' = 'datagen',
-    'rows-per-second' = '999999999',
-    'fields.item_id.min' = '0',
-    'fields.item_id.max' = '9999999',
-    'fields.item_click_uv_hh.min' = '0',
-    'fields.item_click_uv_hh.max' = '999999999',
-    'fields.item_click_pv_hh.min' = '0',
-    'fields.item_click_pv_hh.max' = '999999999',
-    'fields.item_like_uv_hh.min' = '0',
-    'fields.item_like_uv_hh.max' = '999999999',
-    'fields.item_like_pv_hh.min' = '0',
-    'fields.item_like_pv_hh.max' = '999999999',
-    'fields.item_cart_uv_hh.min' = '0',
-    'fields.item_cart_uv_hh.max' = '999999999',
-    'fields.item_cart_pv_hh.min' = '0',
-    'fields.item_cart_pv_hh.max' = '999999999',
-    'fields.item_share_uv_hh.min' = '0',
-    'fields.item_share_uv_hh.max' = '999999999',
-    'fields.item_share_pv_hh.min' = '0',
-    'fields.item_share_pv_hh.max' = '999999999'
-);
-
-CREATE VIEW item_uv_pv_hh AS
-SELECT
-    CAST(MINUTE(NOW()) AS INT) AS `hr`,
-    `item_id`,
-    SUBSTR(`item_name`, 0, MOD(`item_id`, 32) + 64) AS `item_name`,
-    `item_click_uv_hh`,
-    `item_click_pv_hh`,
-    `item_like_uv_hh`,
-    `item_like_pv_hh`,
-    `item_cart_uv_hh`,
-    `item_cart_pv_hh`,
-    `item_share_uv_hh`,
-    `item_share_pv_hh`,
-    NOW() AS `ts`
-FROM item_uv_pv_hh_source;
-
--- __SINK_DDL_BEGIN__
-
-CREATE TABLE IF NOT EXISTS ${SINK_NAME} (
-    `hr` INT,
-    `item_id` BIGINT,
-    `item_name` STRING,
-    `item_click_uv_hh` BIGINT,
-    `item_click_pv_hh` BIGINT,
-    `item_like_uv_hh` BIGINT,
-    `item_like_pv_hh` BIGINT,
-    `item_cart_uv_hh` BIGINT,
-    `item_cart_pv_hh` BIGINT,
-    `item_share_uv_hh` BIGINT,
-    `item_share_pv_hh` BIGINT,
-    `ts` TIMESTAMP(3),
-    PRIMARY KEY (`hr`, `item_id`) NOT ENFORCED
-) WITH (
-    ${DDL_TEMPLATE}
-);
-
--- __SINK_DDL_END__
-
-INSERT INTO ${SINK_NAME} SELECT * FROM item_uv_pv_hh;
diff --git a/flink-table-store-benchmark/src/main/resources/queries/q4.sql b/flink-table-store-benchmark/src/main/resources/queries/q4.sql
deleted file mode 100644
index cd574acd..00000000
--- a/flink-table-store-benchmark/src/main/resources/queries/q4.sql
+++ /dev/null
@@ -1,181 +0,0 @@
--- Licensed to the Apache Software Foundation (ASF) under one
--- or more contributor license agreements.  See the NOTICE file
--- distributed with this work for additional information
--- regarding copyright ownership.  The ASF licenses this file
--- to you under the Apache License, Version 2.0 (the
--- "License"); you may not use this file except in compliance
--- with the License.  You may obtain a copy of the License at
---
---     http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
-
--- Mimics the insert and update by hour of orders in an E-commercial website.
--- Primary keys are related with real time; Each record is about 1.5K bytes.
-
-CREATE TABLE orders_source (
-    `order_id` BIGINT,
-    `category_id` BIGINT,
-    `category_name` STRING,
-    `category_description` STRING,
-    `item_id` BIGINT,
-    `item_name` STRING,
-    `item_description` STRING,
-    `item_price` INT,
-    `item_currency` STRING,
-    `item_tags` STRING,
-    `item_amount` INT,
-    `item_updatetime` TIMESTAMP(3),
-    `buyer_id` BIGINT,
-    `buyer_name` STRING,
-    `buyer_avatar` STRING,
-    `buyer_age` INT,
-    `buyer_company` STRING,
-    `buyer_email` STRING,
-    `buyer_phone` BIGINT,
-    `buyer_country` STRING,
-    `buyer_city` STRING,
-    `buyer_street` STRING,
-    `buyer_zipcode` INT,
-    `buyer_comments` STRING,
-    `buyer_buytime` TIMESTAMP(3),
-    `seller_id` BIGINT,
-    `seller_name` STRING,
-    `seller_avatar` STRING,
-    `seller_age` INT,
-    `seller_company` STRING,
-    `seller_email` STRING,
-    `seller_phone` BIGINT,
-    `seller_country` STRING,
-    `seller_city` STRING,
-    `seller_street` STRING,
-    `seller_zipcode` INT,
-    `seller_comments` STRING,
-    `seller_selltime` TIMESTAMP(3),
-    `shipping_id` BIGINT,
-    `shipping_company` STRING,
-    `shipping_status` INT,
-    `shipping_comments` STRING
-) WITH (
-    'connector' = 'datagen',
-    'rows-per-second' = '999999999',
-    'fields.order_id.min' = '0',
-    'fields.order_id.max' = '9999999',
-    'fields.item_currency.length' = '3',
-    'fields.buyer_phone.min' = '10000000000',
-    'fields.buyer_phone.max' = '99999999999',
-    'fields.buyer_country.length' = '3',
-    'fields.seller_phone.min' = '10000000000',
-    'fields.seller_phone.max' = '99999999999',
-    'fields.seller_country.length' = '3',
-    'fields.shipping_status.min' = '0',
-    'fields.shipping_status.max' = '9'
-);
-
-CREATE VIEW orders AS
-SELECT
-    CAST(MINUTE(NOW()) AS INT) AS `hr`,
-    `order_id`,
-    `category_id`,
-    SUBSTR(`category_name`, 0, ABS(MOD(`category_id`, 64)) + 64) AS `category_name`,
-    SUBSTR(`category_description`, 0, ABS(MOD(`category_id`, 128)) + 128) AS `category_description`,
-    `item_id`,
-    SUBSTR(`item_name`, 0, ABS(MOD(`item_id`, 64)) + 64) AS `item_name`,
-    SUBSTR(`item_description`, 0, ABS(MOD(`item_id`, 128)) + 128) AS `item_description`,
-    `item_price`,
-    `item_currency`,
-    SUBSTR(`item_tags`, 0, ABS(MOD(`item_id`, 32)) + 32) AS `item_tags`,
-    `item_amount`,
-    `item_updatetime`,
-    `buyer_id`,
-    SUBSTR(`buyer_name`, 0, ABS(MOD(`buyer_id`, 16)) + 16) AS `buyer_name`,
-    SUBSTR(`buyer_avatar`, 0, ABS(MOD(`buyer_id`, 128)) + 128) AS `buyer_avatar`,
-    `buyer_age`,
-    SUBSTR(`buyer_company`, 0, ABS(MOD(`buyer_id`, 32)) + 32) AS `buyer_company`,
-    SUBSTR(`buyer_email`, 0, ABS(MOD(`buyer_id`, 16)) + 16) AS `buyer_email`,
-    `buyer_phone`,
-    `buyer_country`,
-    SUBSTR(`buyer_city`, 0, ABS(MOD(`buyer_id`, 8)) + 8) AS `buyer_city`,
-    SUBSTR(`buyer_street`, 0, ABS(MOD(`buyer_id`, 16)) + 32) AS `buyer_street`,
-    `buyer_zipcode`,
-    SUBSTR(`buyer_comments`, 0, ABS(MOD(`buyer_id`, 128)) + 128) AS `buyer_street`,
-    `buyer_buytime`,
-    `seller_id`,
-    SUBSTR(`seller_name`, 0, ABS(MOD(`seller_id`, 16)) + 16) AS `seller_name`,
-    SUBSTR(`seller_avatar`, 0, ABS(MOD(`seller_id`, 128)) + 128) AS `seller_avatar`,
-    `seller_age`,
-    SUBSTR(`seller_company`, 0, ABS(MOD(`seller_id`, 32)) + 32) AS `seller_company`,
-    SUBSTR(`seller_email`, 0, ABS(MOD(`seller_id`, 16)) + 16) AS `seller_email`,
-    `seller_phone`,
-    `seller_country`,
-    SUBSTR(`seller_city`, 0, ABS(MOD(`seller_id`, 8)) + 8) AS `seller_city`,
-    SUBSTR(`seller_street`, 0, ABS(MOD(`seller_id`, 16)) + 32) AS `seller_street`,
-    `seller_zipcode`,
-    SUBSTR(`seller_comments`, 0, ABS(MOD(`seller_id`, 128)) + 128) AS `seller_comments`,
-    `seller_selltime`,
-    `shipping_id`,
-    SUBSTR(`shipping_company`, 0, ABS(MOD(`shipping_id`, 32)) + 32) AS `shipping_company`,
-    `shipping_status`,
-    SUBSTR(`shipping_comments`, 0, ABS(MOD(`shipping_id`, 128)) + 128) AS `shipping_comments`,
-    NOW() AS `ts`
-FROM orders_source;
-
--- __SINK_DDL_BEGIN__
-
-CREATE TABLE IF NOT EXISTS ${SINK_NAME} (
-    `hr` INT,
-    `order_id` BIGINT,
-    `category_id` BIGINT,
-    `category_name` STRING,
-    `category_description` STRING,
-    `item_id` BIGINT,
-    `item_name` STRING,
-    `item_description` STRING,
-    `item_price` INT,
-    `item_currency` STRING,
-    `item_tags` STRING,
-    `item_amount` INT,
-    `item_updatetime` TIMESTAMP(3),
-    `buyer_id` BIGINT,
-    `buyer_name` STRING,
-    `buyer_avatar` STRING,
-    `buyer_age` INT,
-    `buyer_company` STRING,
-    `buyer_email` STRING,
-    `buyer_phone` BIGINT,
-    `buyer_country` STRING,
-    `buyer_city` STRING,
-    `buyer_street` STRING,
-    `buyer_zipcode` INT,
-    `buyer_comments` STRING,
-    `buyer_buytime` TIMESTAMP(3),
-    `seller_id` BIGINT,
-    `seller_name` STRING,
-    `seller_avatar` STRING,
-    `seller_age` INT,
-    `seller_company` STRING,
-    `seller_email` STRING,
-    `seller_phone` BIGINT,
-    `seller_country` STRING,
-    `seller_city` STRING,
-    `seller_street` STRING,
-    `seller_zipcode` INT,
-    `seller_comments` STRING,
-    `seller_selltime` TIMESTAMP(3),
-    `shipping_id` BIGINT,
-    `shipping_company` STRING,
-    `shipping_status` INT,
-    `shipping_comments` STRING,
-    `ts` TIMESTAMP(3),
-    PRIMARY KEY (`order_id`) NOT ENFORCED
-) WITH (
-    ${DDL_TEMPLATE}
-);
-
--- __SINK_DDL_END__
-
-INSERT INTO ${SINK_NAME} SELECT * FROM orders;
diff --git a/flink-table-store-benchmark/src/main/resources/queries/queries.yaml b/flink-table-store-benchmark/src/main/resources/queries/queries.yaml
index c4774e05..a084c934 100644
--- a/flink-table-store-benchmark/src/main/resources/queries/queries.yaml
+++ b/flink-table-store-benchmark/src/main/resources/queries/queries.yaml
@@ -17,10 +17,5 @@
 ################################################################################
 
 q1:
-  bounded: false
-q2:
-  bounded: false
-q3:
-  bounded: false
-q4:
-  bounded: false
+  sql: [q1.sql]
+  row-num: [500000000]