You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/01/17 15:17:46 UTC

[kylin] branch kylin-on-parquet-v2 updated (2ad58c7 -> dc5f295)

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a change to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from 2ad58c7  KYLIN-4872 Fix NPE when there are more than one segment if cube planner is open
     new af3a1b5  KYLIN-4857 Refactor system cube for kylin4
     new ad7b472  add test case
     new 7431240  KYLIN-4496: Metric data missing
     new 3400338  KYLIN-4573 Add option to indicate whether to close file for every append for Hive Producer
     new 6f48ce8  KYLIN-4653 Make the capacity for the LinkedBlockingQueue of BlockingReservoir configurable
     new 5b3859c  KYLIN-4653 Make the capacity for the LinkedBlockingQueue of BlockingReservoir configurable
     new dc5f295  Fix test case and fix system-cube.sh script

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/maven.yml                        |   8 +-
 build/bin/system-cube.sh                           |  38 +-
 .../org/apache/kylin/common/KylinConfigBase.java   |  20 +-
 .../apache/kylin/metrics/QuerySparkMetrics.java    | 763 +++++++++++++++++++++
 .../kylin/metrics/lib/impl/BlockingReservoir.java  |  31 +-
 .../kylin/metrics/lib/impl/MetricsSystem.java      |   2 +
 ...pertyEnum.java => QuerySparkExecutionEnum.java} |  49 +-
 ...RPCPropertyEnum.java => QuerySparkJobEnum.java} |  41 +-
 ...CPropertyEnum.java => QuerySparkStageEnum.java} |  40 +-
 examples/test_case_data/localmeta/kylin.properties |   5 +-
 .../org/apache/spark/sql/SparderContext.scala      |   6 +
 .../spark/sql/metrics/SparderMetricsListener.scala | 144 ++++
 .../kylin/metrics/lib/impl/hive/HiveProducer.java  |  94 ++-
 .../kylin/rest/controller/CubeController.java      |   9 +-
 .../apache/kylin/rest/init/InitialTaskManager.java |   2 +
 .../kylin/rest/metrics/QueryMetricsFacade.java     | 166 +----
 .../apache/kylin/rest/response/SQLResponse.java    |  10 +
 .../org/apache/kylin/rest/service/CubeService.java |   6 +-
 .../kylin/rest/service/DashboardService.java       |  20 +-
 .../apache/kylin/rest/service/QueryService.java    |  24 +-
 .../kylin/rest/response/SQLResponseTest.java       |   9 +-
 server/src/main/resources/kylinMetrics.xml         |   9 +-
 .../kylin/rest/metrics/QueryMetricsTest.java       | 254 ++++++-
 .../tool/metrics/systemcube/CubeDescCreator.java   | 302 +++++---
 .../metrics/systemcube/CubeInstanceCreator.java    |  14 +-
 .../tool/metrics/systemcube/HiveTableCreator.java  | 136 ++--
 .../tool/metrics/systemcube/KylinTableCreator.java |  20 +-
 .../tool/metrics/systemcube/ModelCreator.java      | 138 ++--
 .../kylin/tool/metrics/systemcube/SCCreator.java   |  18 +-
 .../systemcube/streamingv2/KafkaTopicCreator.java  |   6 +-
 webapp/app/js/services/cubes.js                    |   4 +-
 31 files changed, 1865 insertions(+), 523 deletions(-)
 create mode 100644 core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java
 copy core-metrics/src/main/java/org/apache/kylin/metrics/property/{QueryPropertyEnum.java => QuerySparkExecutionEnum.java} (55%)
 copy core-metrics/src/main/java/org/apache/kylin/metrics/property/{QueryRPCPropertyEnum.java => QuerySparkJobEnum.java} (58%)
 copy core-metrics/src/main/java/org/apache/kylin/metrics/property/{QueryRPCPropertyEnum.java => QuerySparkStageEnum.java} (56%)
 create mode 100644 kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala


[kylin] 01/07: KYLIN-4857 Refactor system cube for kylin4

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit af3a1b55ae1bbfb157ce9ec63a73651865c4c94a
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Tue Jan 5 18:49:19 2021 +0800

    KYLIN-4857 Refactor system cube for kylin4
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  20 +-
 .../apache/kylin/metrics/QuerySparkMetrics.java    | 741 +++++++++++++++++++++
 .../metrics/property/QuerySparkExecutionEnum.java  |  80 +++
 .../kylin/metrics/property/QuerySparkJobEnum.java  |  66 ++
 .../metrics/property/QuerySparkStageEnum.java      |  71 ++
 .../org/apache/spark/sql/SparderContext.scala      |   6 +
 .../spark/sql/metrics/SparderMetricsListener.scala | 144 ++++
 .../kylin/rest/metrics/QueryMetricsFacade.java     | 164 +----
 .../org/apache/kylin/rest/service/CubeService.java |   6 +-
 .../kylin/rest/service/DashboardService.java       |  21 +-
 .../apache/kylin/rest/service/QueryService.java    |  16 +-
 .../kylin/rest/metrics/QueryMetricsTest.java       |   6 +-
 .../tool/metrics/systemcube/CubeDescCreator.java   | 156 +++--
 .../metrics/systemcube/CubeInstanceCreator.java    |  14 +-
 .../tool/metrics/systemcube/HiveTableCreator.java  | 136 ++--
 .../tool/metrics/systemcube/KylinTableCreator.java |  20 +-
 .../tool/metrics/systemcube/ModelCreator.java      | 138 ++--
 .../kylin/tool/metrics/systemcube/SCCreator.java   |  18 +-
 .../systemcube/streamingv2/KafkaTopicCreator.java  |   6 +-
 19 files changed, 1450 insertions(+), 379 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index dc91a7f..36950ec 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2349,20 +2349,28 @@ public abstract class KylinConfigBase implements Serializable {
                 + getKylinMetricsSubjectSuffix();
     }
 
-    public String getKylinMetricsSubjectQuery() {
-        return getOptional("kylin.metrics.subject-query", "METRICS_QUERY") + "_" + getKylinMetricsSubjectSuffix();
+    public String getKylinMetricsSubjectQueryExecution() {
+        return getOptional("kylin.metrics.subject-query", "METRICS_QUERY_EXECUTION") + "_" + getKylinMetricsSubjectSuffix();
     }
 
-    public String getKylinMetricsSubjectQueryCube() {
-        return getOptional("kylin.metrics.subject-query-cube", "METRICS_QUERY_CUBE") + "_"
+    public String getKylinMetricsSubjectQuerySparkJob() {
+        return getOptional("kylin.metrics.subject-query-cube", "METRICS_QUERY_SPARK_JOB") + "_"
                 + getKylinMetricsSubjectSuffix();
     }
 
-    public String getKylinMetricsSubjectQueryRpcCall() {
-        return getOptional("kylin.metrics.subject-query-rpc", "METRICS_QUERY_RPC") + "_"
+    public String getKylinMetricsSubjectQuerySparkStage() {
+        return getOptional("kylin.metrics.subject-query-rpc", "METRICS_QUERY_SPARK_STAGE") + "_"
                 + getKylinMetricsSubjectSuffix();
     }
 
+    public int getKylinMetricsCacheExpireSeconds() {
+        return Integer.parseInt(this.getOptional("kylin.metrics.query-cache.expire-seconds", "600"));
+    }
+
+    public int getKylinMetricsCacheMaxEntries() {
+        return Integer.parseInt(this.getOptional("kylin.metrics.query-cache.max-entries", "10000"));
+    }
+
     public Map<String, String> getKylinMetricsConf() {
         return getPropertiesByPrefix("kylin.metrics.");
     }
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java b/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java
new file mode 100644
index 0000000..ed2430c
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java
@@ -0,0 +1,741 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metrics;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.apache.kylin.metrics.lib.impl.TimedRecordEvent;
+import org.apache.kylin.metrics.property.QuerySparkExecutionEnum;
+import org.apache.kylin.metrics.property.QuerySparkJobEnum;
+import org.apache.kylin.metrics.property.QuerySparkStageEnum;
+import org.apache.kylin.shaded.com.google.common.cache.Cache;
+import org.apache.kylin.shaded.com.google.common.cache.CacheBuilder;
+import org.apache.kylin.shaded.com.google.common.cache.RemovalListener;
+import org.apache.kylin.shaded.com.google.common.cache.RemovalNotification;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+public class QuerySparkMetrics {
+    private static final Logger logger = LoggerFactory.getLogger(QuerySparkMetrics.class);
+    private static final QuerySparkMetrics instance = new QuerySparkMetrics();
+    private static final int sparkMetricsNum = 10;
+    private org.apache.kylin.shaded.com.google.common.cache.Cache<String, QueryExecutionMetrics> queryExecutionMetricsMap;
+
+    private QuerySparkMetrics() {
+        queryExecutionMetricsMap = CacheBuilder.newBuilder()
+                .maximumSize(KylinConfig.getInstanceFromEnv().getKylinMetricsCacheMaxEntries())
+                .expireAfterWrite(KylinConfig.getInstanceFromEnv().getKylinMetricsCacheExpireSeconds(),
+                        TimeUnit.SECONDS)
+                .removalListener(new RemovalListener<String, QueryExecutionMetrics>() {
+                    @Override
+                    public void onRemoval(RemovalNotification<String, QueryExecutionMetrics> notification) {
+                        try {
+                            updateMetricsToReservoir(notification.getKey(), notification.getValue());
+                            logger.info("Query metrics {} is removed due to {}, update to metrics reservoir successful",
+                                    notification.getKey(), notification.getCause());
+                        } catch(Exception e) {
+                            logger.warn("Query metrics {} is removed due to {}, update to metrics reservoir failed",
+                                    notification.getKey(), notification.getCause());
+                        }
+                    }
+                }).build();
+
+        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
+                                                                                @Override
+                                                                                public void run() {
+                                                                                    queryExecutionMetricsMap.cleanUp();
+                                                                                }
+                                                                            },
+                KylinConfig.getInstanceFromEnv().getKylinMetricsCacheExpireSeconds(),
+                KylinConfig.getInstanceFromEnv().getKylinMetricsCacheExpireSeconds(), TimeUnit.SECONDS);
+
+    }
+
+    public static QuerySparkMetrics getInstance() {
+        return instance;
+    }
+
+    public void onJobStart(String queryId, String sparderName, long executionId, long executionStartTime, int jobId,
+            long jobStartTime) {
+        QueryExecutionMetrics queryExecutionMetrics = queryExecutionMetricsMap.getIfPresent(queryId);
+        if (queryExecutionMetrics == null) {
+            queryExecutionMetrics = new QueryExecutionMetrics();
+            ConcurrentMap<Integer, SparkJobMetrics> sparkJobMetricsMap = Maps.newConcurrentMap();
+            queryExecutionMetrics.setQueryId(queryId);
+            queryExecutionMetrics.setSparderName(sparderName);
+            queryExecutionMetrics.setExecutionId(executionId);
+            queryExecutionMetrics.setStartTime(executionStartTime);
+            queryExecutionMetrics.setSparkJobMetricsMap(sparkJobMetricsMap);
+            queryExecutionMetricsMap.put(queryId, queryExecutionMetrics);
+        }
+        SparkJobMetrics sparkJobMetrics = new SparkJobMetrics();
+        sparkJobMetrics.setExecutionId(executionId);
+        sparkJobMetrics.setJobId(jobId);
+        sparkJobMetrics.setStartTime(jobStartTime);
+
+        ConcurrentMap<Integer, SparkStageMetrics> sparkStageMetricsMap = Maps.newConcurrentMap();
+        sparkJobMetrics.setSparkStageMetricsMap(sparkStageMetricsMap);
+
+        queryExecutionMetrics.getSparkJobMetricsMap().put(jobId, sparkJobMetrics);
+    }
+
+    public void onSparkStageStart(String queryId, int jobId, int stageId, String stageType, long submitTime) {
+        QueryExecutionMetrics queryExecutionMetrics = queryExecutionMetricsMap.getIfPresent(queryId);
+        if (queryExecutionMetrics != null && queryExecutionMetrics.getSparkJobMetricsMap().get(jobId) != null) {
+            SparkStageMetrics sparkStageMetrics = new SparkStageMetrics();
+            sparkStageMetrics.setStageId(stageId);
+            sparkStageMetrics.setStageType(stageType);
+            sparkStageMetrics.setSubmitTime(submitTime);
+            queryExecutionMetrics.getSparkJobMetricsMap().get(jobId).getSparkStageMetricsMap().put(stageId,
+                    sparkStageMetrics);
+        }
+    }
+
+    public void updateSparkStageMetrics(String queryId, int jobId, int stageId, boolean isSuccess,
+            SparkStageMetrics sparkStageMetricsEnd) {
+        QueryExecutionMetrics queryExecutionMetrics = queryExecutionMetricsMap.getIfPresent(queryId);
+        if (queryExecutionMetrics != null) {
+            SparkJobMetrics sparkJobMetrics = queryExecutionMetrics.getSparkJobMetricsMap().get(jobId);
+            if (sparkJobMetrics != null) {
+                SparkStageMetrics sparkStageMetrics = sparkJobMetrics.getSparkStageMetricsMap().get(stageId);
+                if (sparkStageMetrics != null) {
+                    sparkStageMetrics.setSuccess(isSuccess);
+                    sparkStageMetrics.setMetrics(sparkStageMetricsEnd.getResultSize(),
+                            sparkStageMetricsEnd.getExecutorDeserializeTime(),
+                            sparkStageMetricsEnd.getExecutorDeserializeCpuTime(),
+                            sparkStageMetricsEnd.getExecutorRunTime(), sparkStageMetricsEnd.getExecutorCpuTime(),
+                            sparkStageMetricsEnd.getJvmGCTime(), sparkStageMetricsEnd.getResultSerializationTime(),
+                            sparkStageMetricsEnd.getMemoryBytesSpilled(), sparkStageMetricsEnd.getDiskBytesSpilled(),
+                            sparkStageMetricsEnd.getPeakExecutionMemory());
+                }
+            }
+        }
+    }
+
+    public void updateSparkJobMetrics(String queryId, int jobId, long jobEndTime, boolean isSuccess) {
+        QueryExecutionMetrics queryExecutionMetrics = queryExecutionMetricsMap.getIfPresent(queryId);
+        if (queryExecutionMetrics != null && queryExecutionMetrics.getSparkJobMetricsMap().get(jobId) != null) {
+            SparkJobMetrics sparkJobMetrics = queryExecutionMetrics.getSparkJobMetricsMap().get(jobId);
+            sparkJobMetrics.setEndTime(jobEndTime);
+            sparkJobMetrics.setSuccess(isSuccess);
+        }
+    }
+
+    public void updateExecutionMetrics(String queryId, long executionEndTime) {
+        QueryExecutionMetrics queryExecutionMetrics = queryExecutionMetricsMap.getIfPresent(queryId);
+        if (queryExecutionMetrics != null) {
+            queryExecutionMetrics.setEndTime(executionEndTime);
+        }
+    }
+
+    public Cache<String, QueryExecutionMetrics> getQueryExecutionMetricsMap() {
+        return queryExecutionMetricsMap;
+    }
+
+    public QueryExecutionMetrics getQueryExecutionMetrics(String queryId) {
+        return queryExecutionMetricsMap.getIfPresent(queryId);
+    }
+
+    public void setQueryRealization(String queryId, String realizationName, int realizationType, String cuboidIds) {
+        QueryExecutionMetrics queryExecutionMetrics = queryExecutionMetricsMap.getIfPresent(queryId);
+        if (queryExecutionMetrics != null) {
+            queryExecutionMetrics.setRealization(realizationName);
+            queryExecutionMetrics.setRealizationType(realizationType);
+            queryExecutionMetrics.setCuboidIds(cuboidIds);
+        }
+    }
+
+    /**
+     * report query related metrics
+     */
+    public void updateMetricsToReservoir(String queryId, QueryExecutionMetrics queryExecutionMetrics) {
+        if (!KylinConfig.getInstanceFromEnv().isKylinMetricsReporterForQueryEnabled()) {
+            return;
+        }
+        if (queryExecutionMetrics != null) {
+            RecordEvent queryExecutionMetricsEvent = new TimedRecordEvent(
+                    KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryExecution());
+
+            setQueryWrapper(queryExecutionMetricsEvent, queryExecutionMetrics.getUser(),
+                    queryExecutionMetrics.getSqlIdCode(), queryExecutionMetrics.getQueryType(),
+                    queryId, queryExecutionMetrics.getProject(),
+                    queryExecutionMetrics.getException());
+
+            setSparkExecutionWrapper(queryExecutionMetricsEvent, queryExecutionMetrics.getSparderName(),
+                    queryExecutionMetrics.getExecutionId(), queryExecutionMetrics.getRealization(),
+                    queryExecutionMetrics.getRealizationType(), queryExecutionMetrics.getCuboidIds(),
+                    queryExecutionMetrics.getStartTime(), queryExecutionMetrics.getEndTime());
+
+            setQueryMetrics(queryExecutionMetricsEvent, queryExecutionMetrics.getSqlDuration(),
+                    queryExecutionMetrics.getTotalScanCount(), queryExecutionMetrics.getTotalScanBytes(),
+                    queryExecutionMetrics.getResultCount());
+
+            long[] queryExecutionMetricsList = new long[sparkMetricsNum];
+            for (Map.Entry<Integer, QuerySparkMetrics.SparkJobMetrics> sparkJobMetricsEntry : queryExecutionMetrics
+                    .getSparkJobMetricsMap().entrySet()) {
+                RecordEvent sparkJobMetricsEvent = new TimedRecordEvent(
+                        KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQuerySparkJob());
+
+                setSparkJobWrapper(sparkJobMetricsEvent, queryExecutionMetrics.getProject(),
+                        queryId, queryExecutionMetrics.getExecutionId(),
+                        sparkJobMetricsEntry.getValue().getJobId(), sparkJobMetricsEntry.getValue().getStartTime(),
+                        sparkJobMetricsEntry.getValue().getEndTime(), sparkJobMetricsEntry.getValue().isSuccess());
+
+                long[] sparkJobMetricsList = new long[sparkMetricsNum];
+                for (Map.Entry<Integer, QuerySparkMetrics.SparkStageMetrics> sparkStageMetricsEntry : sparkJobMetricsEntry
+                        .getValue().getSparkStageMetricsMap().entrySet()) {
+                    RecordEvent sparkStageMetricsEvent = new TimedRecordEvent(
+                            KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQuerySparkStage());
+                    QuerySparkMetrics.SparkStageMetrics sparkStageMetrics = sparkStageMetricsEntry.getValue();
+                    setStageWrapper(sparkStageMetricsEvent, queryExecutionMetrics.getProject(), null,
+                            queryId, queryExecutionMetrics.getExecutionId(),
+                            sparkJobMetricsEntry.getValue().getJobId(), sparkStageMetrics.getStageId(),
+                            sparkStageMetrics.getSubmitTime(), sparkStageMetrics.isSuccess());
+                    setStageMetrics(sparkStageMetricsEvent, sparkStageMetrics.getResultSize(),
+                            sparkStageMetrics.getExecutorDeserializeTime(),
+                            sparkStageMetrics.getExecutorDeserializeCpuTime(), sparkStageMetrics.getExecutorRunTime(),
+                            sparkStageMetrics.getExecutorCpuTime(), sparkStageMetrics.getJvmGCTime(),
+                            sparkStageMetrics.getResultSerializationTime(), sparkStageMetrics.getMemoryBytesSpilled(),
+                            sparkStageMetrics.getDiskBytesSpilled(), sparkStageMetrics.getPeakExecutionMemory());
+                    //Update spark stage level metrics
+                    MetricsManager.getInstance().update(sparkStageMetricsEvent);
+
+                    sparkJobMetricsList[0] += sparkStageMetrics.getResultSize();
+                    sparkJobMetricsList[1] += sparkStageMetrics.getExecutorDeserializeTime();
+                    sparkJobMetricsList[2] += sparkStageMetrics.getExecutorDeserializeCpuTime();
+                    sparkJobMetricsList[3] += sparkStageMetrics.getExecutorRunTime();
+                    sparkJobMetricsList[4] += sparkStageMetrics.getExecutorCpuTime();
+                    sparkJobMetricsList[5] += sparkStageMetrics.getJvmGCTime();
+                    sparkJobMetricsList[6] += sparkStageMetrics.getResultSerializationTime();
+                    sparkJobMetricsList[7] += sparkStageMetrics.getMemoryBytesSpilled();
+                    sparkJobMetricsList[8] += sparkStageMetrics.getDiskBytesSpilled();
+                    sparkJobMetricsList[9] += sparkStageMetrics.getPeakExecutionMemory();
+                }
+                setSparkJobMetrics(sparkJobMetricsEvent, sparkJobMetricsList[0], sparkJobMetricsList[1],
+                        sparkJobMetricsList[2], sparkJobMetricsList[3], sparkJobMetricsList[4], sparkJobMetricsList[5],
+                        sparkJobMetricsList[6], sparkJobMetricsList[7], sparkJobMetricsList[8], sparkJobMetricsList[9]);
+                //Update spark job level metrics
+                MetricsManager.getInstance().update(sparkJobMetricsEvent);
+
+                for (int i = 0; i < sparkMetricsNum; i++) {
+                    queryExecutionMetricsList[i] += sparkJobMetricsList[i];
+                }
+            }
+            setSparkExecutionMetrics(queryExecutionMetricsEvent, queryExecutionMetrics.getExecutionDuration(),
+                    queryExecutionMetricsList[0], queryExecutionMetricsList[1], queryExecutionMetricsList[2],
+                    queryExecutionMetricsList[3], queryExecutionMetricsList[4], queryExecutionMetricsList[5],
+                    queryExecutionMetricsList[6], queryExecutionMetricsList[7], queryExecutionMetricsList[8],
+                    queryExecutionMetricsList[9]);
+            //Update execution level metrics
+            MetricsManager.getInstance().update(queryExecutionMetricsEvent);
+        }
+    }
+
+    private static void setQueryWrapper(RecordEvent metricsEvent, String user, long sqlIdCode, String queryType,
+            String queryId, String project, String exception) {
+        metricsEvent.put(QuerySparkExecutionEnum.USER.toString(), user);
+        metricsEvent.put(QuerySparkExecutionEnum.ID_CODE.toString(), sqlIdCode);
+        metricsEvent.put(QuerySparkExecutionEnum.TYPE.toString(), queryType);
+        metricsEvent.put(QuerySparkExecutionEnum.QUERY_ID.toString(), queryId);
+        metricsEvent.put(QuerySparkExecutionEnum.PROJECT.toString(), project);
+        metricsEvent.put(QuerySparkExecutionEnum.EXCEPTION.toString(), exception);
+    }
+
+    private static void setSparkExecutionWrapper(RecordEvent metricsEvent, String sparderName, long executionId,
+            String realizationName, int realizationType, String cuboidIds, long startTime, long endTime) {
+        metricsEvent.put(QuerySparkExecutionEnum.SPARDER_NAME.toString(), sparderName);
+        metricsEvent.put(QuerySparkExecutionEnum.EXECUTION_ID.toString(), executionId);
+        metricsEvent.put(QuerySparkExecutionEnum.REALIZATION.toString(), realizationName);
+        metricsEvent.put(QuerySparkExecutionEnum.REALIZATION_TYPE.toString(), realizationType);
+        metricsEvent.put(QuerySparkExecutionEnum.CUBOID_IDS.toString(), cuboidIds);
+        metricsEvent.put(QuerySparkExecutionEnum.START_TIME.toString(), startTime);
+        metricsEvent.put(QuerySparkExecutionEnum.END_TIME.toString(), endTime);
+    }
+
+    private static void setQueryMetrics(RecordEvent metricsEvent, long sqlDuration, long totalScanCount,
+            long totalScanBytes, long resultCount) {
+        metricsEvent.put(QuerySparkExecutionEnum.TIME_COST.toString(), sqlDuration);
+        metricsEvent.put(QuerySparkExecutionEnum.TOTAL_SCAN_COUNT.toString(), totalScanCount);
+        metricsEvent.put(QuerySparkExecutionEnum.TOTAL_SCAN_BYTES.toString(), totalScanBytes);
+        metricsEvent.put(QuerySparkExecutionEnum.RESULT_COUNT.toString(), resultCount);
+    }
+
+    private static void setSparkExecutionMetrics(RecordEvent metricsEvent, long executionDuration, long resultSize,
+            long executorDeserializeTime, long executorDeserializeCpuTime, long executorRunTime, long executorCpuTime,
+            long jvmGCTime, long resultSerializationTime, long memoryBytesSpilled, long diskBytesSpilled,
+            long peakExecutionMemory) {
+        metricsEvent.put(QuerySparkExecutionEnum.EXECUTION_DURATION.toString(), executionDuration);
+
+        metricsEvent.put(QuerySparkExecutionEnum.RESULT_SIZE.toString(), resultSize);
+        metricsEvent.put(QuerySparkExecutionEnum.EXECUTOR_DESERIALIZE_TIME.toString(), executorDeserializeTime);
+        metricsEvent.put(QuerySparkExecutionEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString(), executorDeserializeCpuTime);
+        metricsEvent.put(QuerySparkExecutionEnum.EXECUTOR_RUN_TIME.toString(), executorRunTime);
+        metricsEvent.put(QuerySparkExecutionEnum.EXECUTOR_CPU_TIME.toString(), executorCpuTime);
+        metricsEvent.put(QuerySparkExecutionEnum.JVM_GC_TIME.toString(), jvmGCTime);
+        metricsEvent.put(QuerySparkExecutionEnum.RESULT_SERIALIZATION_TIME.toString(), resultSerializationTime);
+        metricsEvent.put(QuerySparkExecutionEnum.MEMORY_BYTE_SPILLED.toString(), memoryBytesSpilled);
+        metricsEvent.put(QuerySparkExecutionEnum.DISK_BYTES_SPILLED.toString(), diskBytesSpilled);
+        metricsEvent.put(QuerySparkExecutionEnum.PEAK_EXECUTION_MEMORY.toString(), peakExecutionMemory);
+    }
+
+    private static void setSparkJobMetrics(RecordEvent metricsEvent, long resultSize, long executorDeserializeTime,
+            long executorDeserializeCpuTime, long executorRunTime, long executorCpuTime, long jvmGCTime,
+            long resultSerializationTime, long memoryBytesSpilled, long diskBytesSpilled, long peakExecutionMemory) {
+        metricsEvent.put(QuerySparkJobEnum.RESULT_SIZE.toString(), resultSize);
+        metricsEvent.put(QuerySparkJobEnum.EXECUTOR_DESERIALIZE_TIME.toString(), executorDeserializeTime);
+        metricsEvent.put(QuerySparkJobEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString(), executorDeserializeCpuTime);
+        metricsEvent.put(QuerySparkJobEnum.EXECUTOR_RUN_TIME.toString(), executorRunTime);
+        metricsEvent.put(QuerySparkJobEnum.EXECUTOR_CPU_TIME.toString(), executorCpuTime);
+        metricsEvent.put(QuerySparkJobEnum.JVM_GC_TIME.toString(), jvmGCTime);
+        metricsEvent.put(QuerySparkJobEnum.RESULT_SERIALIZATION_TIME.toString(), resultSerializationTime);
+        metricsEvent.put(QuerySparkJobEnum.MEMORY_BYTE_SPILLED.toString(), memoryBytesSpilled);
+        metricsEvent.put(QuerySparkJobEnum.DISK_BYTES_SPILLED.toString(), diskBytesSpilled);
+        metricsEvent.put(QuerySparkJobEnum.PEAK_EXECUTION_MEMORY.toString(), peakExecutionMemory);
+    }
+
+    private static void setStageMetrics(RecordEvent metricsEvent, long resultSize, long executorDeserializeTime,
+            long executorDeserializeCpuTime, long executorRunTime, long executorCpuTime, long jvmGCTime,
+            long resultSerializationTime, long memoryBytesSpilled, long diskBytesSpilled, long peakExecutionMemory) {
+        metricsEvent.put(QuerySparkStageEnum.RESULT_SIZE.toString(), resultSize);
+        metricsEvent.put(QuerySparkStageEnum.EXECUTOR_DESERIALIZE_TIME.toString(), executorDeserializeTime);
+        metricsEvent.put(QuerySparkStageEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString(), executorDeserializeCpuTime);
+        metricsEvent.put(QuerySparkStageEnum.EXECUTOR_RUN_TIME.toString(), executorRunTime);
+        metricsEvent.put(QuerySparkStageEnum.EXECUTOR_CPU_TIME.toString(), executorCpuTime);
+        metricsEvent.put(QuerySparkStageEnum.JVM_GC_TIME.toString(), jvmGCTime);
+        metricsEvent.put(QuerySparkStageEnum.RESULT_SERIALIZATION_TIME.toString(), resultSerializationTime);
+        metricsEvent.put(QuerySparkStageEnum.MEMORY_BYTE_SPILLED.toString(), memoryBytesSpilled);
+        metricsEvent.put(QuerySparkStageEnum.DISK_BYTES_SPILLED.toString(), diskBytesSpilled);
+        metricsEvent.put(QuerySparkStageEnum.PEAK_EXECUTION_MEMORY.toString(), peakExecutionMemory);
+    }
+
+    private static void setStageWrapper(RecordEvent metricsEvent, String projectName, String realizationName,
+            String queryId, long executionId, int jobId, int stageId, long submitTime, boolean isSuccess) {
+        metricsEvent.put(QuerySparkStageEnum.PROJECT.toString(), projectName);
+        metricsEvent.put(QuerySparkStageEnum.REALIZATION.toString(), realizationName);
+        metricsEvent.put(QuerySparkStageEnum.QUERY_ID.toString(), queryId);
+        metricsEvent.put(QuerySparkStageEnum.EXECUTION_ID.toString(), executionId);
+        metricsEvent.put(QuerySparkStageEnum.JOB_ID.toString(), jobId);
+        metricsEvent.put(QuerySparkStageEnum.STAGE_ID.toString(), stageId);
+        metricsEvent.put(QuerySparkStageEnum.SUBMIT_TIME.toString(), submitTime);
+        metricsEvent.put(QuerySparkStageEnum.IF_SUCCESS.toString(), isSuccess);
+    }
+
+    private static void setSparkJobWrapper(RecordEvent metricsEvent, String projectName, String queryId,
+            long executionId, int jobId, long startTime, long endTime, boolean isSuccess) {
+        metricsEvent.put(QuerySparkJobEnum.PROJECT.toString(), projectName);
+        metricsEvent.put(QuerySparkJobEnum.QUERY_ID.toString(), queryId);
+        metricsEvent.put(QuerySparkJobEnum.EXECUTION_ID.toString(), executionId);
+        metricsEvent.put(QuerySparkJobEnum.JOB_ID.toString(), jobId);
+        metricsEvent.put(QuerySparkJobEnum.START_TIME.toString(), startTime);
+        metricsEvent.put(QuerySparkJobEnum.END_TIME.toString(), endTime);
+        metricsEvent.put(QuerySparkJobEnum.IF_SUCCESS.toString(), isSuccess);
+    }
+
+    public static class QueryExecutionMetrics implements Serializable {
+        private long sqlIdCode;
+        private String user;
+        private String queryType;
+        private String project;
+        private String exception;
+        private long executionId;
+        private String sparderName;
+        private long executionDuration;
+        private String queryId;
+        private String realization;
+        private int realizationType;
+        private String cuboidIds;
+        private long startTime;
+        private long endTime;
+        private ConcurrentMap<Integer, SparkJobMetrics> sparkJobMetricsMap;
+
+        private long sqlDuration;
+        private long totalScanCount;
+        private long totalScanBytes;
+        private int resultCount;
+
+        public String getUser() {
+            return user;
+        }
+
+        public void setUser(String user) {
+            this.user = user;
+        }
+
+        public int getResultCount() {
+            return resultCount;
+        }
+
+        public long getSqlDuration() {
+            return sqlDuration;
+        }
+
+        public long getTotalScanBytes() {
+            return totalScanBytes;
+        }
+
+        public long getTotalScanCount() {
+            return totalScanCount;
+        }
+
+        public void setResultCount(int resultCount) {
+            this.resultCount = resultCount;
+        }
+
+        public void setSqlDuration(long sqlDuration) {
+            this.sqlDuration = sqlDuration;
+        }
+
+        public void setTotalScanBytes(long totalScanBytes) {
+            this.totalScanBytes = totalScanBytes;
+        }
+
+        public void setTotalScanCount(long totalScanCount) {
+            this.totalScanCount = totalScanCount;
+        }
+
+        public String getException() {
+            return exception;
+        }
+
+        public void setException(String exception) {
+            this.exception = exception;
+        }
+
+        public void setProject(String project) {
+            this.project = project;
+        }
+
+        public String getProject() {
+            return project;
+        }
+
+        public String getQueryType() {
+            return queryType;
+        }
+
+        public long getSqlIdCode() {
+            return sqlIdCode;
+        }
+
+        public void setQueryType(String queryType) {
+            this.queryType = queryType;
+        }
+
+        public void setSqlIdCode(long sqlIdCode) {
+            this.sqlIdCode = sqlIdCode;
+        }
+
+        public long getEndTime() {
+            return endTime;
+        }
+
+        public long getStartTime() {
+            return startTime;
+        }
+
+        public void setEndTime(long endTime) {
+            this.endTime = endTime;
+        }
+
+        public void setStartTime(long startTime) {
+            this.startTime = startTime;
+        }
+
+        public void setQueryId(String queryId) {
+            this.queryId = queryId;
+        }
+
+        public String getQueryId() {
+            return queryId;
+        }
+
+        public long getExecutionDuration() {
+            return executionDuration;
+        }
+
+        public void setExecutionDuration(long executionDuration) {
+            this.executionDuration = executionDuration;
+        }
+
+        public ConcurrentMap<Integer, SparkJobMetrics> getSparkJobMetricsMap() {
+            return sparkJobMetricsMap;
+        }
+
+        public long getExecutionId() {
+            return executionId;
+        }
+
+        public String getSparderName() {
+            return sparderName;
+        }
+
+        public void setExecutionId(long executionId) {
+            this.executionId = executionId;
+        }
+
+        public void setSparderName(String sparderName) {
+            this.sparderName = sparderName;
+        }
+
+        public String getCuboidIds() {
+            return cuboidIds;
+        }
+
+        public void setCuboidIds(String cuboidIds) {
+            this.cuboidIds = cuboidIds;
+        }
+
+        public String getRealization() {
+            return realization;
+        }
+
+        public int getRealizationType() {
+            return realizationType;
+        }
+
+        public void setRealization(String realization) {
+            this.realization = realization;
+        }
+
+        public void setRealizationType(int realizationType) {
+            this.realizationType = realizationType;
+        }
+
+        public void setSparkJobMetricsMap(ConcurrentMap<Integer, SparkJobMetrics> sparkJobMetricsMap) {
+            this.sparkJobMetricsMap = sparkJobMetricsMap;
+        }
+    }
+
+    public static class SparkJobMetrics implements Serializable {
+        private long executionId;
+        private int jobId;
+        private long startTime;
+        private long endTime;
+        private boolean isSuccess;
+        private ConcurrentMap<Integer, SparkStageMetrics> sparkStageMetricsMap;
+
+        public void setStartTime(long startTime) {
+            this.startTime = startTime;
+        }
+
+        public void setEndTime(long endTime) {
+            this.endTime = endTime;
+        }
+
+        public long getStartTime() {
+            return startTime;
+        }
+
+        public long getEndTime() {
+            return endTime;
+        }
+
+        public void setExecutionId(long executionId) {
+            this.executionId = executionId;
+        }
+
+        public long getExecutionId() {
+            return executionId;
+        }
+
+        public void setSparkStageMetricsMap(ConcurrentMap<Integer, SparkStageMetrics> sparkStageMetricsMap) {
+            this.sparkStageMetricsMap = sparkStageMetricsMap;
+        }
+
+        public void setJobId(int jobId) {
+            this.jobId = jobId;
+        }
+
+        public void setSuccess(boolean success) {
+            isSuccess = success;
+        }
+
+        public boolean isSuccess() {
+            return isSuccess;
+        }
+
+        public ConcurrentMap<Integer, SparkStageMetrics> getSparkStageMetricsMap() {
+            return sparkStageMetricsMap;
+        }
+
+        public int getJobId() {
+            return jobId;
+        }
+    }
+
+    public static class SparkStageMetrics implements Serializable {
+        private int stageId;
+        private String stageType;
+        private long submitTime;
+        private long endTime;
+        private boolean isSuccess;
+        private long resultSize;
+        private long executorDeserializeTime;
+        private long executorDeserializeCpuTime;
+        private long executorRunTime;
+        private long executorCpuTime;
+        private long jvmGCTime;
+        private long resultSerializationTime;
+        private long memoryBytesSpilled;
+        private long diskBytesSpilled;
+        private long peakExecutionMemory;
+
+        public void setMetrics(long resultSize, long executorDeserializeTime, long executorDeserializeCpuTime,
+                long executorRunTime, long executorCpuTime, long jvmGCTime, long resultSerializationTime,
+                long memoryBytesSpilled, long diskBytesSpilled, long peakExecutionMemory) {
+            this.resultSize = resultSize;
+            this.executorDeserializeTime = executorDeserializeTime;
+            this.executorDeserializeCpuTime = executorDeserializeCpuTime;
+            this.executorRunTime = executorRunTime;
+            this.executorCpuTime = executorCpuTime;
+            this.jvmGCTime = jvmGCTime;
+            this.resultSerializationTime = resultSerializationTime;
+            this.memoryBytesSpilled = memoryBytesSpilled;
+            this.diskBytesSpilled = diskBytesSpilled;
+            this.peakExecutionMemory = peakExecutionMemory;
+        }
+
+        public long getEndTime() {
+            return endTime;
+        }
+
+        public long getSubmitTime() {
+            return submitTime;
+        }
+
+        public void setEndTime(long endTime) {
+            this.endTime = endTime;
+        }
+
+        public void setSubmitTime(long submitTime) {
+            this.submitTime = submitTime;
+        }
+
+        public boolean isSuccess() {
+            return isSuccess;
+        }
+
+        public void setSuccess(boolean success) {
+            isSuccess = success;
+        }
+
+        public void setStageType(String stageType) {
+            this.stageType = stageType;
+        }
+
+        public void setStageId(int stageId) {
+            this.stageId = stageId;
+        }
+
+        public void setResultSize(long resultSize) {
+            this.resultSize = resultSize;
+        }
+
+        public void setResultSerializationTime(long resultSerializationTime) {
+            this.resultSerializationTime = resultSerializationTime;
+        }
+
+        public void setPeakExecutionMemory(long peakExecutionMemory) {
+            this.peakExecutionMemory = peakExecutionMemory;
+        }
+
+        public void setMemoryBytesSpilled(long memoryBytesSpilled) {
+            this.memoryBytesSpilled = memoryBytesSpilled;
+        }
+
+        public void setJvmGCTime(long jvmGCTime) {
+            this.jvmGCTime = jvmGCTime;
+        }
+
+        public void setExecutorRunTime(long executorRunTime) {
+            this.executorRunTime = executorRunTime;
+        }
+
+        public void setExecutorDeserializeTime(long executorDeserializeTime) {
+            this.executorDeserializeTime = executorDeserializeTime;
+        }
+
+        public void setExecutorDeserializeCpuTime(long executorDeserializeCpuTime) {
+            this.executorDeserializeCpuTime = executorDeserializeCpuTime;
+        }
+
+        public void setExecutorCpuTime(long executorCpuTime) {
+            this.executorCpuTime = executorCpuTime;
+        }
+
+        public void setDiskBytesSpilled(long diskBytesSpilled) {
+            this.diskBytesSpilled = diskBytesSpilled;
+        }
+
+        public String getStageType() {
+            return stageType;
+        }
+
+        public long getResultSize() {
+            return resultSize;
+        }
+
+        public long getResultSerializationTime() {
+            return resultSerializationTime;
+        }
+
+        public long getPeakExecutionMemory() {
+            return peakExecutionMemory;
+        }
+
+        public long getMemoryBytesSpilled() {
+            return memoryBytesSpilled;
+        }
+
+        public long getJvmGCTime() {
+            return jvmGCTime;
+        }
+
+        public long getExecutorRunTime() {
+            return executorRunTime;
+        }
+
+        public long getExecutorDeserializeTime() {
+            return executorDeserializeTime;
+        }
+
+        public long getExecutorDeserializeCpuTime() {
+            return executorDeserializeCpuTime;
+        }
+
+        public long getExecutorCpuTime() {
+            return executorCpuTime;
+        }
+
+        public long getDiskBytesSpilled() {
+            return diskBytesSpilled;
+        }
+
+        public int getStageId() {
+            return stageId;
+        }
+    }
+}
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkExecutionEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkExecutionEnum.java
new file mode 100644
index 0000000..b390dd0
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkExecutionEnum.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metrics.property;
+
+import org.apache.kylin.shaded.com.google.common.base.Strings;
+
+public enum QuerySparkExecutionEnum {
+    ID_CODE("QUERY_HASH_CODE"),
+    SQL("QUERY_SQL"),
+    PROJECT("PROJECT"),
+    TYPE("QUERY_TYPE"),
+    REALIZATION("REALIZATION"),
+    REALIZATION_TYPE("REALIZATION_TYPE"),
+    CUBOID_IDS("CUBOID_IDS"),
+    QUERY_ID("QUERY_ID"),
+    EXECUTION_ID("EXECUTION_ID"),
+    USER("KUSER"),
+    SPARDER_NAME("SPARDER_NAME"),
+    EXCEPTION("EXCEPTION"),
+    START_TIME("START_TIME"),
+    END_TIME("END_TIME"),
+
+
+    TIME_COST("QUERY_TIME_COST"),
+    TOTAL_SCAN_COUNT("TOTAL_SCAN_COUNT"),
+    TOTAL_SCAN_BYTES("TOTAL_SCAN_BYTES"),
+    RESULT_COUNT("RESULT_COUNT"),
+
+    EXECUTION_DURATION("EXECUTION_DURATION"),
+    RESULT_SIZE("RESULT_SIZE"),
+    EXECUTOR_DESERIALIZE_TIME("EXECUTOR_DESERIALIZE_TIME"),
+    EXECUTOR_DESERIALIZE_CPU_TIME("EXECUTOR_DESERIALIZE_CPU_TIME"),
+    EXECUTOR_RUN_TIME("EXECUTOR_RUN_TIME"),
+    EXECUTOR_CPU_TIME("EXECUTOR_CPU_TIME"),
+    JVM_GC_TIME("JVM_GC_TIME"),
+    RESULT_SERIALIZATION_TIME("RESULT_SERIALIZATION_TIME"),
+    MEMORY_BYTE_SPILLED("MEMORY_BYTE_SPILLED"),
+    DISK_BYTES_SPILLED("DISK_BYTES_SPILLED"),
+    PEAK_EXECUTION_MEMORY("PEAK_EXECUTION_MEMORY");
+
+    private final String propertyName;
+
+    QuerySparkExecutionEnum(String name) {
+        this.propertyName = name;
+    }
+
+    public static QuerySparkExecutionEnum getByName(String name) {
+        if (Strings.isNullOrEmpty(name)) {
+            throw new IllegalArgumentException("Name should not be empty");
+        }
+        for (QuerySparkExecutionEnum property : QuerySparkExecutionEnum.values()) {
+            if (property.propertyName.equalsIgnoreCase(name)) {
+                return property;
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return propertyName;
+    }
+}
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkJobEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkJobEnum.java
new file mode 100644
index 0000000..0f041c2
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkJobEnum.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metrics.property;
+
+import org.apache.kylin.shaded.com.google.common.base.Strings;
+
+public enum QuerySparkJobEnum {
+    PROJECT("PROJECT"),
+    QUERY_ID("QUERY_ID"),
+    EXECUTION_ID("EXECUTION_ID"),
+    JOB_ID("JOB_ID"),
+    START_TIME("START_TIME"),
+    END_TIME("END_TIME"),
+    IF_SUCCESS("IF_SUCCESS"),
+
+    RESULT_SIZE("RESULT_SIZE"),
+    EXECUTOR_DESERIALIZE_TIME("EXECUTOR_DESERIALIZE_TIME"),
+    EXECUTOR_DESERIALIZE_CPU_TIME("EXECUTOR_DESERIALIZE_CPU_TIME"),
+    EXECUTOR_RUN_TIME("EXECUTOR_RUN_TIME"),
+    EXECUTOR_CPU_TIME("EXECUTOR_CPU_TIME"),
+    JVM_GC_TIME("JVM_GC_TIME"),
+    RESULT_SERIALIZATION_TIME("RESULT_SERIALIZATION_TIME"),
+    MEMORY_BYTE_SPILLED("MEMORY_BYTE_SPILLED"),
+    DISK_BYTES_SPILLED("DISK_BYTES_SPILLED"),
+    PEAK_EXECUTION_MEMORY("PEAK_EXECUTION_MEMORY");
+
+    private final String propertyName;
+
+    QuerySparkJobEnum(String name) {
+        this.propertyName = name;
+    }
+
+    public static QuerySparkJobEnum getByName(String name) {
+        if (Strings.isNullOrEmpty(name)) {
+            throw new IllegalArgumentException("Name should not be empty");
+        }
+        for (QuerySparkJobEnum property : QuerySparkJobEnum.values()) {
+            if (property.propertyName.equalsIgnoreCase(name)) {
+                return property;
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return propertyName;
+    }
+}
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkStageEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkStageEnum.java
new file mode 100644
index 0000000..dcb0f42
--- /dev/null
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QuerySparkStageEnum.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metrics.property;
+
+import org.apache.kylin.shaded.com.google.common.base.Strings;
+
+/**
+ * Definition of Metrics dimension and measure for Spark stage
+ */
+public enum QuerySparkStageEnum {
+    PROJECT("PROJECT"),
+    QUERY_ID("QUERY_ID"),
+    EXECUTION_ID("EXECUTION_ID"),
+    JOB_ID("JOB_ID"),
+    STAGE_ID("STAGE_ID"),
+    SUBMIT_TIME("SUBMIT_TIME"),
+    REALIZATION("REALIZATION"),
+    CUBOID_ID("CUBOID_NAME"),
+    IF_SUCCESS("IF_SUCCESS"),
+
+    RESULT_SIZE("RESULT_SIZE"),
+    EXECUTOR_DESERIALIZE_TIME("EXECUTOR_DESERIALIZE_TIME"),
+    EXECUTOR_DESERIALIZE_CPU_TIME("EXECUTOR_DESERIALIZE_CPU_TIME"),
+    EXECUTOR_RUN_TIME("EXECUTOR_RUN_TIME"),
+    EXECUTOR_CPU_TIME("EXECUTOR_CPU_TIME"),
+    JVM_GC_TIME("JVM_GC_TIME"),
+    RESULT_SERIALIZATION_TIME("RESULT_SERIALIZATION_TIME"),
+    MEMORY_BYTE_SPILLED("MEMORY_BYTE_SPILLED"),
+    DISK_BYTES_SPILLED("DISK_BYTES_SPILLED"),
+    PEAK_EXECUTION_MEMORY("PEAK_EXECUTION_MEMORY");
+
+    private final String propertyName;
+
+    QuerySparkStageEnum(String name) {
+        this.propertyName = name;
+    }
+
+    public static QuerySparkStageEnum getByName(String name) {
+        if (Strings.isNullOrEmpty(name)) {
+            throw new IllegalArgumentException("Name should not be empty");
+        }
+        for (QuerySparkStageEnum property : QuerySparkStageEnum.values()) {
+            if (property.propertyName.equalsIgnoreCase(name)) {
+                return property;
+            }
+        }
+
+        return null;
+    }
+
+    @Override
+    public String toString() {
+        return propertyName;
+    }
+}
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
index 9e89a62..0d25dba 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
@@ -37,6 +37,7 @@ import org.apache.kylin.query.monitor.SparderContextCanary
 import org.apache.kylin.spark.classloader.ClassLoaderUtils
 import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
 import org.apache.spark.sql.execution.datasource.KylinSourceStrategy
+import org.apache.spark.sql.metrics.SparderMetricsListener
 import org.apache.spark.utils.YarnInfoFetcherUtils
 
 // scalastyle:off
@@ -150,6 +151,11 @@ object SparderContext extends Logging {
                     .enableHiveSupport()
                     .getOrCreateKylinSession()
               }
+              if (kylinConf.isKylinMetricsReporterForQueryEnabled) {
+                val appStatusListener = new SparderMetricsListener()
+                sparkSession.sparkContext.addSparkListener(appStatusListener)
+                logInfo("Query metrics reporter is enabled, sparder metrics listener is added.")
+              }
               spark = sparkSession
               val appid = sparkSession.sparkContext.applicationId
               // write application id to file 'sparkappid'
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala
new file mode 100644
index 0000000..6237235
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.metrics
+
+import org.apache.kylin.metrics.QuerySparkMetrics
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
+
+class SparderMetricsListener() extends SparkListener with Logging {
+
+  var stageJobMap: Map[Int, Int] = Map()
+  var jobExecutionMap: Map[Int, QueryInformation] = Map()
+  var executionInformationMap: Map[Long, ExecutionInformation] = Map()
+
+  val queryExecutionMetrics = QuerySparkMetrics.getInstance()
+
+  override def onJobStart(event: SparkListenerJobStart): Unit = {
+    val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
+    val sparderName = event.properties.getProperty("spark.app.name")
+    val kylinQueryId = event.properties.getProperty("kylin.query.id")
+
+    if (executionIdString == null || kylinQueryId == null) {
+      logInfo(s"The job ${event.jobId} is not a query job.")
+      return
+    }
+
+    val executionId = executionIdString.toLong
+
+    if (executionInformationMap.apply(executionId).sparderName == null) {
+      val executionInformation = new ExecutionInformation(kylinQueryId,
+        executionInformationMap.apply(executionId).executionStartTime, sparderName)
+      executionInformationMap += (executionId -> executionInformation)
+    }
+
+    jobExecutionMap += (event.jobId -> new QueryInformation(kylinQueryId, executionId))
+
+    val stages = event.stageInfos.iterator
+    while (stages.hasNext) {
+      val stage: StageInfo = stages.next()
+      stageJobMap += (stage.stageId -> event.jobId)
+    }
+
+    queryExecutionMetrics.onJobStart(kylinQueryId, sparderName, executionId,
+      executionInformationMap.apply(executionId).executionStartTime, event.jobId, event.time)
+  }
+
+  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
+    if (jobExecutionMap.contains(event.jobId)) {
+      val isSuccess = event.jobResult match {
+        case JobSucceeded => true
+        case _ => false
+      }
+      queryExecutionMetrics.updateSparkJobMetrics(jobExecutionMap.apply(event.jobId).queryId, event.jobId, event.time,
+        isSuccess)
+      logInfo(s"The job ${event.jobId} has completed and the relevant metrics are updated to the cache")
+      jobExecutionMap -= event.jobId
+    }
+  }
+
+  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
+    val queryId = event.properties.getProperty("kylin.query.id")
+    val stageId = event.stageInfo.stageId
+
+    if (stageJobMap.contains(stageId)) {
+      val submitTime = event.stageInfo.submissionTime match {
+        case Some(x) => x
+        case None => -1
+      }
+      queryExecutionMetrics.onSparkStageStart(queryId, stageJobMap.apply(stageId), stageId, event.stageInfo.name, submitTime)
+    }
+  }
+
+  override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
+    val stageInfo = event.stageInfo
+    if (stageJobMap.contains(stageInfo.stageId) && jobExecutionMap.contains(stageJobMap.apply(stageInfo.stageId))) {
+      val isSuccess = stageInfo.getStatusString match {
+        case "succeeded" => true
+        case _ => false
+      }
+      val stageMetrics = stageInfo.taskMetrics
+      val sparkStageMetrics = new QuerySparkMetrics.SparkStageMetrics
+      sparkStageMetrics.setMetrics(stageMetrics.resultSize, stageMetrics.executorDeserializeCpuTime,
+        stageMetrics.executorDeserializeCpuTime, stageMetrics.executorRunTime, stageMetrics.executorCpuTime,
+        stageMetrics.jvmGCTime, stageMetrics.resultSerializationTime,
+        stageMetrics.memoryBytesSpilled, stageMetrics.diskBytesSpilled, stageMetrics.peakExecutionMemory)
+      queryExecutionMetrics.updateSparkStageMetrics(jobExecutionMap.apply(stageJobMap.apply(stageInfo.stageId)).queryId,
+        stageJobMap.apply(stageInfo.stageId), stageInfo.stageId, isSuccess, sparkStageMetrics)
+      stageJobMap -= stageInfo.stageId
+
+      logInfo(s"The stage ${event.stageInfo.stageId} has completed and the relevant metrics are updated to the cache")
+    }
+  }
+
+  override def onOtherEvent(event: SparkListenerEvent): Unit = {
+    event match {
+      case e: SparkListenerSQLExecutionStart => onQueryExecutionStart(e)
+      case e: SparkListenerSQLExecutionEnd => onQueryExecutionEnd(e)
+      case _ => // Ignore
+    }
+  }
+
+  private def onQueryExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
+    executionInformationMap += (event.executionId -> new ExecutionInformation(null, event.time, null))
+  }
+
+  private def onQueryExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
+    val executionInformation = executionInformationMap.apply(event.executionId)
+    queryExecutionMetrics.updateExecutionMetrics(executionInformation.queryId, event.time)
+    executionInformationMap -= event.executionId
+    logInfo(s"QueryExecution ${event.executionId} is completed at ${event.time} " +
+      s"and the relevant metrics are updated to the cache")
+  }
+}
+
+// ============================
+
+class ExecutionInformation(
+                            var queryId: String,
+                            var executionStartTime: Long,
+                            var sparderName: String
+                          )
+
+class QueryInformation(
+                        val queryId: String,
+                        val executionId: Long
+                      )
diff --git a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
index f51bf08..7a8f2d7 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
@@ -20,7 +20,6 @@ package org.apache.kylin.rest.metrics;
 
 import java.nio.charset.Charset;
 import java.util.Locale;
-import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import javax.annotation.concurrent.ThreadSafe;
@@ -28,23 +27,15 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.hadoop.metrics2.MetricsException;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.QueryContext;
-import org.apache.kylin.common.QueryContextFacade;
-import org.apache.kylin.metrics.MetricsManager;
-import org.apache.kylin.metrics.lib.impl.RecordEvent;
-import org.apache.kylin.metrics.lib.impl.TimedRecordEvent;
-import org.apache.kylin.metrics.property.QueryCubePropertyEnum;
-import org.apache.kylin.metrics.property.QueryPropertyEnum;
-import org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
-import org.apache.kylin.query.enumerator.OLAPQuery;
+import org.apache.kylin.metrics.QuerySparkMetrics;
 import org.apache.kylin.rest.request.SQLRequest;
 import org.apache.kylin.rest.response.SQLResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.security.core.context.SecurityContextHolder;
 
 import org.apache.kylin.shaded.com.google.common.hash.HashFunction;
 import org.apache.kylin.shaded.com.google.common.hash.Hashing;
+import org.springframework.security.core.context.SecurityContextHolder;
 
 /**
  * The entrance of metrics features.
@@ -70,9 +61,9 @@ public class QueryMetricsFacade {
         return hashFunc.hashString(sql, Charset.forName("UTF-8")).asLong();
     }
 
-    public static void updateMetrics(SQLRequest sqlRequest, SQLResponse sqlResponse) {
+    public static void updateMetrics(String queryId, SQLRequest sqlRequest, SQLResponse sqlResponse) {
         updateMetricsToLocal(sqlRequest, sqlResponse);
-        updateMetricsToReservoir(sqlRequest, sqlResponse);
+        updateMetricsToCache(queryId, sqlRequest, sqlResponse);
     }
 
     private static void updateMetricsToLocal(SQLRequest sqlRequest, SQLResponse sqlResponse) {
@@ -89,71 +80,27 @@ public class QueryMetricsFacade {
         update(getQueryMetrics(cubeMetricName), sqlResponse);
     }
 
-    /**
-     * report query related metrics
-     */
-    private static void updateMetricsToReservoir(SQLRequest sqlRequest, SQLResponse sqlResponse) {
-        if (!KylinConfig.getInstanceFromEnv().isKylinMetricsReporterForQueryEnabled()) {
-            return;
-        }
+    private static void updateMetricsToCache(String queryId, SQLRequest sqlRequest, SQLResponse sqlResponse) {
         String user = SecurityContextHolder.getContext().getAuthentication().getName();
         if (user == null) {
             user = "unknown";
         }
-        for (QueryContext.RPCStatistics entry : QueryContextFacade.current().getRpcStatisticsList()) {
-            RecordEvent rpcMetricsEvent = new TimedRecordEvent(
-                    KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryRpcCall());
-            setRPCWrapper(rpcMetricsEvent, //
-                    norm(sqlRequest.getProject()), entry.getRealizationName(), entry.getRpcServer(),
-                    entry.getException());
-            setRPCStats(rpcMetricsEvent, //
-                    entry.getCallTimeMs(), entry.getSkippedRows(), entry.getScannedRows(), entry.getReturnedRows(),
-                    entry.getAggregatedRows());
-            //For update rpc level related metrics
-            MetricsManager.getInstance().update(rpcMetricsEvent);
-        }
-        for (QueryContext.CubeSegmentStatisticsResult contextEntry : sqlResponse.getCubeSegmentStatisticsList()) {
-            RecordEvent queryMetricsEvent = new TimedRecordEvent(
-                    KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQuery());
-            setQueryWrapper(queryMetricsEvent, //
-                    user, sqlRequest.getSql(), sqlResponse.isStorageCacheUsed() ? "CACHE" : contextEntry.getQueryType(),
-                    norm(sqlRequest.getProject()), contextEntry.getRealization(), contextEntry.getRealizationType(),
-                    sqlResponse.getThrowable());
-
-            long totalStorageReturnCount = 0L;
-            if (contextEntry.getQueryType().equalsIgnoreCase(OLAPQuery.EnumeratorTypeEnum.OLAP.name())) {
-                for (Map<String, QueryContext.CubeSegmentStatistics> cubeEntry : contextEntry.getCubeSegmentStatisticsMap()
-                        .values()) {
-                    for (QueryContext.CubeSegmentStatistics segmentEntry : cubeEntry.values()) {
-                        RecordEvent cubeSegmentMetricsEvent = new TimedRecordEvent(
-                                KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQueryCube());
 
-                        setCubeWrapper(cubeSegmentMetricsEvent, //
-                                norm(sqlRequest.getProject()), segmentEntry.getCubeName(), segmentEntry.getSegmentName(),
-                                segmentEntry.getSourceCuboidId(), segmentEntry.getTargetCuboidId(),
-                                segmentEntry.getFilterMask());
-
-                        setCubeStats(cubeSegmentMetricsEvent, //
-                                segmentEntry.getCallCount(), segmentEntry.getCallTimeSum(), segmentEntry.getCallTimeMax(),
-                                segmentEntry.getStorageSkippedRows(), segmentEntry.getStorageScannedRows(),
-                                segmentEntry.getStorageReturnedRows(), segmentEntry.getStorageAggregatedRows(),
-                                segmentEntry.isIfSuccess(), 1.0 / cubeEntry.size());
-
-                        totalStorageReturnCount += segmentEntry.getStorageReturnedRows();
-                        //For update cube segment level related query metrics
-                        MetricsManager.getInstance().update(cubeSegmentMetricsEvent);
-                    }
-                }
-            } else {
-                if (!sqlResponse.getIsException()) {
-                    totalStorageReturnCount = sqlResponse.getResults().size();
-                }
-            }
-            setQueryStats(queryMetricsEvent, //
-                    sqlResponse.getDuration(), sqlResponse.getResults() == null ? 0 : sqlResponse.getResults().size(),
-                    totalStorageReturnCount);
-            //For update query level metrics
-            MetricsManager.getInstance().update(queryMetricsEvent);
+        QuerySparkMetrics.QueryExecutionMetrics queryExecutionMetrics = QuerySparkMetrics.getInstance()
+                .getQueryExecutionMetricsMap().getIfPresent(queryId);
+        if (queryExecutionMetrics != null) {
+            queryExecutionMetrics.setUser(user);
+            queryExecutionMetrics.setSqlIdCode(getSqlHashCode(sqlRequest.getSql()));
+            queryExecutionMetrics.setProject(norm(sqlRequest.getProject()));
+            queryExecutionMetrics.setQueryType(sqlResponse.isStorageCacheUsed() ? "CACHE" : "PARQUET");
+
+            queryExecutionMetrics.setSqlDuration(sqlResponse.getDuration());
+            queryExecutionMetrics.setTotalScanCount(sqlResponse.getTotalScanCount());
+            queryExecutionMetrics.setTotalScanBytes(sqlResponse.getTotalScanBytes());
+            queryExecutionMetrics.setResultCount(sqlResponse.getResults() == null ? 0 : sqlResponse.getResults().size());
+
+            queryExecutionMetrics.setException(sqlResponse.getThrowable() == null ? "NULL" :
+                    sqlResponse.getThrowable().getClass().getName());
         }
     }
 
@@ -161,77 +108,6 @@ public class QueryMetricsFacade {
         return project.toUpperCase(Locale.ROOT);
     }
 
-    private static void setRPCWrapper(RecordEvent metricsEvent, String projectName, String realizationName,
-            String rpcServer, Throwable throwable) {
-        metricsEvent.put(QueryRPCPropertyEnum.PROJECT.toString(), projectName);
-        metricsEvent.put(QueryRPCPropertyEnum.REALIZATION.toString(), realizationName);
-        metricsEvent.put(QueryRPCPropertyEnum.RPC_SERVER.toString(), rpcServer);
-        metricsEvent.put(QueryRPCPropertyEnum.EXCEPTION.toString(),
-                throwable == null ? "NULL" : throwable.getClass().getName());
-    }
-
-    private static void setRPCStats(RecordEvent metricsEvent, long callTimeMs, long skipCount, long scanCount,
-            long returnCount, long aggrCount) {
-        metricsEvent.put(QueryRPCPropertyEnum.CALL_TIME.toString(), callTimeMs);
-        metricsEvent.put(QueryRPCPropertyEnum.SKIP_COUNT.toString(), skipCount); //Number of skips on region servers based on region meta or fuzzy filter
-        metricsEvent.put(QueryRPCPropertyEnum.SCAN_COUNT.toString(), scanCount); //Count scanned by region server
-        metricsEvent.put(QueryRPCPropertyEnum.RETURN_COUNT.toString(), returnCount);//Count returned by region server
-        metricsEvent.put(QueryRPCPropertyEnum.AGGR_FILTER_COUNT.toString(), scanCount - returnCount); //Count filtered & aggregated by coprocessor
-        metricsEvent.put(QueryRPCPropertyEnum.AGGR_COUNT.toString(), aggrCount); //Count aggregated by coprocessor
-    }
-
-    private static void setCubeWrapper(RecordEvent metricsEvent, String projectName, String cubeName,
-            String segmentName, long sourceCuboidId, long targetCuboidId, long filterMask) {
-        metricsEvent.put(QueryCubePropertyEnum.PROJECT.toString(), projectName);
-        metricsEvent.put(QueryCubePropertyEnum.CUBE.toString(), cubeName);
-        metricsEvent.put(QueryCubePropertyEnum.SEGMENT.toString(), segmentName);
-        metricsEvent.put(QueryCubePropertyEnum.CUBOID_SOURCE.toString(), sourceCuboidId);
-        metricsEvent.put(QueryCubePropertyEnum.CUBOID_TARGET.toString(), targetCuboidId);
-        metricsEvent.put(QueryCubePropertyEnum.IF_MATCH.toString(), sourceCuboidId == targetCuboidId);
-        metricsEvent.put(QueryCubePropertyEnum.FILTER_MASK.toString(), filterMask);
-    }
-
-    private static void setCubeStats(RecordEvent metricsEvent, long callCount, long callTimeSum, long callTimeMax,
-            long skipCount, long scanCount, long returnCount, long aggrCount, boolean ifSuccess, double weightPerHit) {
-        metricsEvent.put(QueryCubePropertyEnum.CALL_COUNT.toString(), callCount);
-        metricsEvent.put(QueryCubePropertyEnum.TIME_SUM.toString(), callTimeSum);
-        metricsEvent.put(QueryCubePropertyEnum.TIME_MAX.toString(), callTimeMax);
-        metricsEvent.put(QueryCubePropertyEnum.SKIP_COUNT.toString(), skipCount);
-        metricsEvent.put(QueryCubePropertyEnum.SCAN_COUNT.toString(), scanCount);
-        metricsEvent.put(QueryCubePropertyEnum.RETURN_COUNT.toString(), returnCount);
-        metricsEvent.put(QueryCubePropertyEnum.AGGR_FILTER_COUNT.toString(), scanCount - returnCount);
-        metricsEvent.put(QueryCubePropertyEnum.AGGR_COUNT.toString(), aggrCount);
-        metricsEvent.put(QueryCubePropertyEnum.IF_SUCCESS.toString(), ifSuccess);
-        metricsEvent.put(QueryCubePropertyEnum.WEIGHT_PER_HIT.toString(), weightPerHit);
-    }
-
-    private static void setQueryWrapper(RecordEvent metricsEvent, String user, String sql, String queryType,
-            String projectName, String realizationName, int realizationType, Throwable throwable) {
-        metricsEvent.put(QueryPropertyEnum.USER.toString(), user);
-        metricsEvent.put(QueryPropertyEnum.ID_CODE.toString(), getSqlHashCode(sql));
-        metricsEvent.put(QueryPropertyEnum.SQL.toString(), sql);
-        metricsEvent.put(QueryPropertyEnum.TYPE.toString(), queryType);
-        metricsEvent.put(QueryPropertyEnum.PROJECT.toString(), projectName);
-        metricsEvent.put(QueryPropertyEnum.REALIZATION.toString(), realizationName);
-        metricsEvent.put(QueryPropertyEnum.REALIZATION_TYPE.toString(), realizationType);
-        metricsEvent.put(QueryPropertyEnum.EXCEPTION.toString(),
-                throwable == null ? "NULL" : throwable.getClass().getName());
-    }
-
-    private static void setQueryStats(RecordEvent metricsEvent, long callTimeMs, long returnCountByCalcite,
-            long returnCountByStorage) {
-        metricsEvent.put(QueryPropertyEnum.TIME_COST.toString(), callTimeMs);
-        metricsEvent.put(QueryPropertyEnum.CALCITE_RETURN_COUNT.toString(), returnCountByCalcite);
-        metricsEvent.put(QueryPropertyEnum.STORAGE_RETURN_COUNT.toString(), returnCountByStorage);
-        long countAggrAndFilter = returnCountByStorage - returnCountByCalcite;
-        if (countAggrAndFilter < 0) {
-            countAggrAndFilter = 0;
-            logger.warn(returnCountByStorage + " rows returned by storage less than " + returnCountByCalcite
-                    + " rows returned by calcite");
-        }
-        metricsEvent.put(QueryPropertyEnum.AGGR_FILTER_COUNT.toString(), countAggrAndFilter);
-    }
-
     private static void update(QueryMetrics queryMetrics, SQLResponse sqlResponse) {
         try {
             incrQueryCount(queryMetrics, sqlResponse);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 63cb4d8..6b60ab8 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -1042,7 +1042,7 @@ public class CubeService extends BasicService implements InitializingBean {
         String cuboidColumn = isCuboidSource ? QueryCubePropertyEnum.CUBOID_SOURCE.toString()
                 : QueryCubePropertyEnum.CUBOID_TARGET.toString();
         String hitMeasure = QueryCubePropertyEnum.WEIGHT_PER_HIT.toString();
-        String table = getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQueryCube());
+        String table = getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQuerySparkJob());
         String sql = "select " + cuboidColumn + ", sum(" + hitMeasure + ")" //
                 + " from " + table//
                 + " where " + QueryCubePropertyEnum.CUBE.toString() + " = ?" //
@@ -1057,7 +1057,7 @@ public class CubeService extends BasicService implements InitializingBean {
         String cuboidTgt = QueryCubePropertyEnum.CUBOID_TARGET.toString();
         String aggCount = QueryCubePropertyEnum.AGGR_COUNT.toString();
         String returnCount = QueryCubePropertyEnum.RETURN_COUNT.toString();
-        String table = getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQueryCube());
+        String table = getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQuerySparkJob());
         String sql = "select " + cuboidSource + ", " + cuboidTgt + ", avg(" + aggCount + "), avg(" + returnCount + ")"//
                 + " from " + table //
                 + " where " + QueryCubePropertyEnum.CUBE.toString() + " = ?" //
@@ -1070,7 +1070,7 @@ public class CubeService extends BasicService implements InitializingBean {
     public Map<Long, Long> getCuboidQueryMatchCount(String cubeName) {
         String cuboidSource = QueryCubePropertyEnum.CUBOID_SOURCE.toString();
         String hitMeasure = QueryCubePropertyEnum.WEIGHT_PER_HIT.toString();
-        String table = getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQueryCube());
+        String table = getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQuerySparkJob());
         String sql = "select " + cuboidSource + ", sum(" + hitMeasure + ")" //
                 + " from " + table //
                 + " where " + QueryCubePropertyEnum.CUBE.toString() + " = ?" //
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java b/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java
index f622b89..768876f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java
@@ -31,7 +31,8 @@ import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.metrics.MetricsManager;
 import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
 import org.apache.kylin.metrics.property.JobPropertyEnum;
-import org.apache.kylin.metrics.property.QueryPropertyEnum;
+import org.apache.kylin.metrics.property.QuerySparkExecutionEnum;
+import org.apache.kylin.metrics.property.QuerySparkExecutionEnum;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.BadRequestException;
 import org.apache.kylin.rest.request.PrepareSqlRequest;
@@ -119,7 +120,7 @@ public class DashboardService extends BasicService {
         Map<String, String> filterMap = getBaseFilterMap(CategoryEnum.QUERY, projectName, startTime, endTime);
         filterMap.putAll(getCubeFilterMap(CategoryEnum.QUERY, cubeName));
         return createPrepareSqlRequest(null, metrics,
-                getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQuery()), filterMap);
+                getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQueryExecution()), filterMap);
     };
 
     public PrepareSqlRequest getJobMetricsSQLRequest(String startTime, String endTime, String projectName,
@@ -143,7 +144,7 @@ public class DashboardService extends BasicService {
             if (categoryEnum == CategoryEnum.QUERY) {
                 dimensionSQL = new String[] { QueryDimensionEnum.valueOf(dimension).toSQL() };
                 metricSQL = new String[] { QueryMetricEnum.valueOf(metric).toSQL() };
-                table = getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQuery());
+                table = getMetricsManager().getSystemTableFromSubject(getConfig().getKylinMetricsSubjectQueryExecution());
             } else if (categoryEnum == CategoryEnum.JOB) {
                 dimensionSQL = new String[] { JobDimensionEnum.valueOf(dimension).toSQL() };
                 metricSQL = new String[] { JobMetricEnum.valueOf(metric).toSQL() };
@@ -217,10 +218,10 @@ public class DashboardService extends BasicService {
         HashMap<String, String> filterMap = new HashMap<>();
 
         if (category == CategoryEnum.QUERY) {
-            filterMap.put(QueryPropertyEnum.EXCEPTION.toString() + " = ?", "NULL");
+            filterMap.put(QuerySparkExecutionEnum.EXCEPTION.toString() + " = ?", "NULL");
 
             if (!Strings.isNullOrEmpty(cubeName)) {
-                filterMap.put(QueryPropertyEnum.REALIZATION + " = ?", cubeName);
+                filterMap.put(QuerySparkExecutionEnum.REALIZATION + " = ?", cubeName);
             }
         } else if (category == CategoryEnum.JOB && !Strings.isNullOrEmpty(cubeName)) {
             HybridInstance hybridInstance = getHybridManager().getHybridInstance(cubeName);
@@ -299,8 +300,8 @@ public class DashboardService extends BasicService {
     }
 
     private enum QueryDimensionEnum {
-        PROJECT(QueryPropertyEnum.PROJECT.toString()), //
-        CUBE(QueryPropertyEnum.REALIZATION.toString()), //
+        PROJECT(QuerySparkExecutionEnum.PROJECT.toString()), //
+        CUBE(QuerySparkExecutionEnum.REALIZATION.toString()), //
         DAY(TimePropertyEnum.DAY_DATE.toString()), //
         WEEK(TimePropertyEnum.WEEK_BEGIN_DATE.toString()), //
         MONTH(TimePropertyEnum.MONTH.toString());
@@ -336,9 +337,9 @@ public class DashboardService extends BasicService {
 
     private enum QueryMetricEnum {
         QUERY_COUNT("count(*)"), //
-        AVG_QUERY_LATENCY("avg(" + QueryPropertyEnum.TIME_COST.toString() + ")"), //
-        MAX_QUERY_LATENCY("max(" + QueryPropertyEnum.TIME_COST.toString() + ")"), //
-        MIN_QUERY_LATENCY("min(" + QueryPropertyEnum.TIME_COST.toString() + ")");
+        AVG_QUERY_LATENCY("avg(" + QuerySparkExecutionEnum.TIME_COST.toString() + ")"), //
+        MAX_QUERY_LATENCY("max(" + QuerySparkExecutionEnum.TIME_COST.toString() + ")"), //
+        MIN_QUERY_LATENCY("min(" + QuerySparkExecutionEnum.TIME_COST.toString() + ")");
 
         private final String sql;
 
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 855174b..f020d6b 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -68,6 +68,7 @@ import org.apache.kylin.cache.cachemanager.MemcachedCacheManager;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.QueryContextFacade;
+import org.apache.kylin.metrics.QuerySparkMetrics;
 import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
 import org.apache.kylin.common.persistence.ResourceStore;
@@ -120,6 +121,7 @@ import org.apache.kylin.rest.util.SQLResponseSignatureUtil;
 import org.apache.kylin.rest.util.TableauInterceptor;
 import org.apache.kylin.storage.hybrid.HybridInstance;
 import org.apache.kylin.storage.hybrid.HybridManager;
+import org.apache.spark.sql.SparderContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -468,9 +470,14 @@ public class QueryService extends BasicService {
             }
 
             sqlResponse.setDuration(queryContext.getAccumulatedMillis());
+            if (QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryContext.getQueryId()) != null) {
+                String sqlTraceUrl = SparderContext.appMasterTrackURL() + "/SQL/execution/?id=" +
+                        QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryContext.getQueryId()).getExecutionId();
+                sqlResponse.setTraceUrl(sqlTraceUrl);
+            }
             logQuery(queryContext.getQueryId(), sqlRequest, sqlResponse);
             try {
-                recordMetric(sqlRequest, sqlResponse);
+                recordMetric(queryContext.getQueryId(), sqlRequest, sqlResponse);
             } catch (Throwable th) {
                 logger.warn("Write metric error.", th);
             }
@@ -585,8 +592,8 @@ public class QueryService extends BasicService {
                 checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles");
     }
 
-    protected void recordMetric(SQLRequest sqlRequest, SQLResponse sqlResponse) throws UnknownHostException {
-        QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse);
+    protected void recordMetric(String queryId, SQLRequest sqlRequest, SQLResponse sqlResponse) throws UnknownHostException {
+        QueryMetricsFacade.updateMetrics(queryId, sqlRequest, sqlResponse);
         QueryMetrics2Facade.updateMetrics(sqlRequest, sqlResponse);
     }
 
@@ -1200,7 +1207,8 @@ public class QueryService extends BasicService {
 
                     realizations.add(realizationName);
                 }
-                queryContext.setContextRealization(ctx.id, realizationName, realizationType);
+                QuerySparkMetrics.getInstance().setQueryRealization(queryContext.getQueryId(), realizationName,
+                        realizationType, cuboidIdsSb.toString());
             }
 
 
diff --git a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
index 10e5d8b..fbacc78 100644
--- a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
@@ -71,7 +71,7 @@ public class QueryMetricsTest extends ServiceTestBase {
         sqlResponse.setResults(results);
         sqlResponse.setStorageCacheUsed(true);
 
-        QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse);
+        QueryMetricsFacade.updateMetrics("", sqlRequest, sqlResponse);
 
         Thread.sleep(2000);
 
@@ -100,7 +100,7 @@ public class QueryMetricsTest extends ServiceTestBase {
         sqlResponse2.setCube("test_cube");
         sqlResponse2.setIsException(true);
 
-        QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse2);
+        QueryMetricsFacade.updateMetrics("", sqlRequest, sqlResponse2);
 
         Thread.sleep(2000);
 
@@ -146,7 +146,7 @@ public class QueryMetricsTest extends ServiceTestBase {
 
         sqlResponse.setCubeSegmentStatisticsList(context.getCubeSegmentStatisticsResultList());
 
-        QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse);
+        QueryMetricsFacade.updateMetrics("", sqlRequest, sqlResponse);
 
         Thread.sleep(2000);
 
diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
index 729dabb..2d82e02 100644
--- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
@@ -43,10 +43,9 @@ import org.apache.kylin.metadata.model.ParameterDesc;
 import org.apache.kylin.metrics.lib.impl.RecordEvent;
 import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
 import org.apache.kylin.metrics.property.JobPropertyEnum;
-import org.apache.kylin.metrics.property.QueryCubePropertyEnum;
-import org.apache.kylin.metrics.property.QueryPropertyEnum;
-import org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
-
+import org.apache.kylin.metrics.property.QuerySparkExecutionEnum;
+import org.apache.kylin.metrics.property.QuerySparkJobEnum;
+import org.apache.kylin.metrics.property.QuerySparkStageEnum;
 import org.apache.kylin.shaded.com.google.common.collect.Lists;
 import org.apache.kylin.shaded.com.google.common.collect.Maps;
 import org.apache.kylin.shaded.com.google.common.collect.Sets;
@@ -54,11 +53,11 @@ import org.apache.kylin.tool.metrics.systemcube.def.MetricsSinkDesc;
 
 public class CubeDescCreator {
 
-    public static CubeDesc generateKylinCubeDescForMetricsQuery(KylinConfig config, MetricsSinkDesc sinkDesc) {
-        String tableName = sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQuery());
+    public static CubeDesc generateKylinCubeDescForMetricsQueryExecution(KylinConfig config, MetricsSinkDesc sinkDesc) {
+        String tableName = sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryExecution());
 
         //Set for dimensions
-        List<String> dimensions = ModelCreator.getDimensionsForMetricsQuery();
+        List<String> dimensions = ModelCreator.getDimensionsForMetricsQueryExecution();
         dimensions.remove(TimePropertyEnum.DAY_TIME.toString());
         dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString());
 
@@ -68,39 +67,49 @@ public class CubeDescCreator {
         }
 
         //Set for measures
-        List<String> measures = ModelCreator.getMeasuresForMetricsQuery();
-        measures.remove(QueryPropertyEnum.ID_CODE.toString());
+        List<String> measures = ModelCreator.getMeasuresForMetricsQueryExecution();
+        measures.remove(QuerySparkExecutionEnum.ID_CODE.toString());
         List<MeasureDesc> measureDescList = Lists.newArrayListWithExpectedSize(measures.size() * 2 + 1 + 1);
 
-        List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsQuery();
+        List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsQueryExecution();
         Map<String, String> measureTypeMap = Maps.newHashMapWithExpectedSize(measureTypeList.size());
         for (Pair<String, String> entry : measureTypeList) {
             measureTypeMap.put(entry.getFirst(), entry.getSecond());
         }
         measureDescList.add(getMeasureCount());
-        measureDescList.add(getMeasureMin(QueryPropertyEnum.TIME_COST.toString(),
-                measureTypeMap.get(QueryPropertyEnum.TIME_COST.toString())));
+        measureDescList.add(getMeasureMin(QuerySparkExecutionEnum.TIME_COST.toString(),
+                measureTypeMap.get(QuerySparkExecutionEnum.TIME_COST.toString())));
         for (String measure : measures) {
             measureDescList.add(getMeasureSum(measure, measureTypeMap.get(measure)));
             measureDescList.add(getMeasureMax(measure, measureTypeMap.get(measure)));
         }
-        measureDescList.add(getMeasureHLL(QueryPropertyEnum.ID_CODE.toString()));
-        measureDescList.add(getMeasurePercentile(QueryPropertyEnum.TIME_COST.toString()));
+        measureDescList.add(getMeasureHLL(QuerySparkExecutionEnum.ID_CODE.toString()));
+        measureDescList.add(getMeasurePercentile(QuerySparkExecutionEnum.TIME_COST.toString()));
 
         //Set for row key
         RowKeyColDesc[] rowKeyColDescs = new RowKeyColDesc[dimensionDescList.size()];
         int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs);
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.USER.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkExecutionEnum.USER.toString(), idx + 1);
+        idx++;
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkExecutionEnum.PROJECT.toString(), idx + 1);
+        idx++;
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkExecutionEnum.REALIZATION.toString(), idx + 1);
+        idx++;
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkExecutionEnum.REALIZATION_TYPE.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.PROJECT.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkExecutionEnum.CUBOID_IDS.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.REALIZATION.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkExecutionEnum.EXCEPTION.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.REALIZATION_TYPE.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkExecutionEnum.TYPE.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.EXCEPTION.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkExecutionEnum.SPARDER_NAME.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.TYPE.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkExecutionEnum.QUERY_ID.toString(), idx + 1);
+        idx++;
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkExecutionEnum.START_TIME.toString(), idx + 1);
+        idx++;
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkExecutionEnum.END_TIME.toString(), idx + 1);
         idx++;
         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, RecordEvent.RecordReserveKeyEnum.HOST.toString(), idx + 1);
         idx++;
@@ -109,11 +118,18 @@ public class CubeDescCreator {
         rowKeyDesc.setRowkeyColumns(rowKeyColDescs);
 
         //Set for aggregation group
-        String[][] hierarchy_dims = new String[2][];
+        String[][] hierarchy_dims = new String[4][];
         hierarchy_dims[0] = getTimeHierarchy();
-        hierarchy_dims[1] = new String[2];
-        hierarchy_dims[1][0] = QueryPropertyEnum.REALIZATION_TYPE.toString();
-        hierarchy_dims[1][1] = QueryPropertyEnum.REALIZATION.toString();
+        hierarchy_dims[1] = new String[3];
+        hierarchy_dims[1][0] = QuerySparkExecutionEnum.REALIZATION_TYPE.toString();
+        hierarchy_dims[1][1] = QuerySparkExecutionEnum.REALIZATION.toString();
+        hierarchy_dims[1][2] = QuerySparkExecutionEnum.CUBOID_IDS.toString();
+        hierarchy_dims[2] = new String[2];
+        hierarchy_dims[2][0] = QuerySparkExecutionEnum.START_TIME.toString();
+        hierarchy_dims[2][1] = QuerySparkExecutionEnum.END_TIME.toString();
+        hierarchy_dims[3] = new String[2];
+        hierarchy_dims[3][0] = QuerySparkExecutionEnum.SPARDER_NAME.toString();
+        hierarchy_dims[3][1] = RecordEvent.RecordReserveKeyEnum.HOST.toString();
         for (int i = 0; i < hierarchy_dims.length; i++) {
             hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]);
         }
@@ -135,15 +151,15 @@ public class CubeDescCreator {
                 rowKeyDesc, aggGroup, hBaseMapping, sinkDesc.getCubeDescOverrideProperties());
     }
 
-    public static CubeDesc generateKylinCubeDescForMetricsQueryCube(KylinConfig config, MetricsSinkDesc sinkDesc) {
-        String tableName = sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryCube());
+    public static CubeDesc generateKylinCubeDescForMetricsQuerySparkJob(KylinConfig config, MetricsSinkDesc sinkDesc) {
+        String tableName = sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQuerySparkJob());
 
         //Set for dimensions
-        List<String> dimensions = ModelCreator.getDimensionsForMetricsQueryCube();
+        List<String> dimensions = ModelCreator.getDimensionsForMetricsQuerySparkJob();
         dimensions.remove(TimePropertyEnum.DAY_TIME.toString());
         dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString());
         dimensions.remove(RecordEvent.RecordReserveKeyEnum.HOST.toString());
-        dimensions.remove(QueryCubePropertyEnum.PROJECT.toString());
+        dimensions.remove(QuerySparkJobEnum.PROJECT.toString());
 
         List<DimensionDesc> dimensionDescList = Lists.newArrayListWithExpectedSize(dimensions.size());
         for (String dimensionName : dimensions) {
@@ -151,10 +167,10 @@ public class CubeDescCreator {
         }
 
         //Set for measures
-        List<String> measures = ModelCreator.getMeasuresForMetricsQueryCube();
+        List<String> measures = ModelCreator.getMeasuresForMetricsQuerySparkJob();
         List<MeasureDesc> measureDescList = Lists.newArrayListWithExpectedSize(measures.size() * 2);
 
-        List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsQueryCube();
+        List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsQuerySparkJob();
         Map<String, String> measureTypeMap = Maps.newHashMapWithExpectedSize(measureTypeList.size());
         for (Pair<String, String> entry : measureTypeList) {
             measureTypeMap.put(entry.getFirst(), entry.getSecond());
@@ -162,53 +178,43 @@ public class CubeDescCreator {
         measureDescList.add(getMeasureCount());
         for (String measure : measures) {
             measureDescList.add(getMeasureSum(measure, measureTypeMap.get(measure)));
-            if (!measure.equals(QueryCubePropertyEnum.WEIGHT_PER_HIT.toString())) {
-                measureDescList.add(getMeasureMax(measure, measureTypeMap.get(measure)));
-            }
+            measureDescList.add(getMeasureMax(measure, measureTypeMap.get(measure)));
         }
 
         //Set for row key
         RowKeyColDesc[] rowKeyColDescs = new RowKeyColDesc[dimensionDescList.size()];
         int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs);
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.CUBE.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkJobEnum.JOB_ID.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.SEGMENT.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkJobEnum.EXECUTION_ID.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.CUBOID_SOURCE.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkJobEnum.QUERY_ID.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.CUBOID_TARGET.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkJobEnum.START_TIME.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.FILTER_MASK.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkJobEnum.END_TIME.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.IF_MATCH.toString(), idx + 1);
-        idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.IF_SUCCESS.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkJobEnum.IF_SUCCESS.toString(), idx + 1);
         idx++;
 
         RowKeyDesc rowKeyDesc = new RowKeyDesc();
         rowKeyDesc.setRowkeyColumns(rowKeyColDescs);
 
-        //Set for aggregation group
-        String[] mandatory_dims = new String[] { QueryCubePropertyEnum.CUBE.toString() };
-        mandatory_dims = refineColumnWithTable(tableName, mandatory_dims);
-
-        String[][] hierarchy_dims = new String[1][];
+        String[][] hierarchy_dims = new String[2][];
         hierarchy_dims[0] = getTimeHierarchy();
+        hierarchy_dims[1] = new String[3];
+        hierarchy_dims[1][0] = QuerySparkJobEnum.QUERY_ID.toString();
+        hierarchy_dims[1][1] = QuerySparkJobEnum.EXECUTION_ID.toString();
+        hierarchy_dims[1][2] = QuerySparkJobEnum.JOB_ID.toString();
+
         for (int i = 0; i < hierarchy_dims.length; i++) {
             hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]);
         }
 
-        String[][] joint_dims = new String[1][];
-        joint_dims[0] = new String[] { QueryCubePropertyEnum.CUBOID_SOURCE.toString(),
-                QueryCubePropertyEnum.CUBOID_TARGET.toString() };
-        for (int i = 0; i < joint_dims.length; i++) {
-            joint_dims[i] = refineColumnWithTable(tableName, joint_dims[i]);
-        }
-
         SelectRule selectRule = new SelectRule();
-        selectRule.mandatoryDims = mandatory_dims;
+        selectRule.mandatoryDims = new String[0];
         selectRule.hierarchyDims = hierarchy_dims;
-        selectRule.jointDims = joint_dims;
+        selectRule.jointDims = new String[0][0];
 
         AggregationGroup aggGroup = new AggregationGroup();
         aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions));
@@ -222,11 +228,11 @@ public class CubeDescCreator {
                 rowKeyDesc, aggGroup, hBaseMapping, sinkDesc.getCubeDescOverrideProperties());
     }
 
-    public static CubeDesc generateKylinCubeDescForMetricsQueryRPC(KylinConfig config, MetricsSinkDesc sinkDesc) {
-        String tableName = sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryRpcCall());
+    public static CubeDesc generateKylinCubeDescForMetricsQuerySparkStage(KylinConfig config, MetricsSinkDesc sinkDesc) {
+        String tableName = sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQuerySparkStage());
 
         //Set for dimensions
-        List<String> dimensions = ModelCreator.getDimensionsForMetricsQueryRPC();
+        List<String> dimensions = ModelCreator.getDimensionsForMetricsQuerySparkStage();
         dimensions.remove(TimePropertyEnum.DAY_TIME.toString());
         dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString());
 
@@ -236,10 +242,10 @@ public class CubeDescCreator {
         }
 
         //Set for measures
-        List<String> measures = ModelCreator.getMeasuresForMetricsQueryRPC();
+        List<String> measures = ModelCreator.getMeasuresForMetricsQuerySparkStage();
         List<MeasureDesc> measureDescList = Lists.newArrayListWithExpectedSize(measures.size() * 2 + 1 + 1);
 
-        List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsQueryRPC();
+        List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsQuerySparkStage();
         Map<String, String> measureTypeMap = Maps.newHashMapWithExpectedSize(measureTypeList.size());
         for (Pair<String, String> entry : measureTypeList) {
             measureTypeMap.put(entry.getFirst(), entry.getSecond());
@@ -249,28 +255,42 @@ public class CubeDescCreator {
             measureDescList.add(getMeasureSum(measure, measureTypeMap.get(measure)));
             measureDescList.add(getMeasureMax(measure, measureTypeMap.get(measure)));
         }
-        measureDescList.add(getMeasurePercentile(QueryRPCPropertyEnum.CALL_TIME.toString()));
 
         //Set for row key
         RowKeyColDesc[] rowKeyColDescs = new RowKeyColDesc[dimensionDescList.size()];
         int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs);
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryRPCPropertyEnum.PROJECT.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkStageEnum.PROJECT.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryRPCPropertyEnum.REALIZATION.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkStageEnum.REALIZATION.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryRPCPropertyEnum.RPC_SERVER.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkStageEnum.CUBOID_ID.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, RecordEvent.RecordReserveKeyEnum.HOST.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkStageEnum.QUERY_ID.toString(), idx + 1);
+        idx++;
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkStageEnum.EXECUTION_ID.toString(), idx + 1);
         idx++;
-        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryRPCPropertyEnum.EXCEPTION.toString(), idx + 1);
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkStageEnum.JOB_ID.toString(), idx + 1);
+        idx++;
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkStageEnum.STAGE_ID.toString(), idx + 1);
+        idx++;
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkStageEnum.IF_SUCCESS.toString(), idx + 1);
+        idx++;
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkStageEnum.SUBMIT_TIME.toString(), idx + 1);
+        idx++;
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, RecordEvent.RecordReserveKeyEnum.HOST.toString(), idx + 1);
         idx++;
 
         RowKeyDesc rowKeyDesc = new RowKeyDesc();
         rowKeyDesc.setRowkeyColumns(rowKeyColDescs);
 
         //Set for aggregation group
-        String[][] hierarchy_dims = new String[1][];
+        String[][] hierarchy_dims = new String[2][];
         hierarchy_dims[0] = getTimeHierarchy();
+        hierarchy_dims[1] = new String[4];
+        hierarchy_dims[1][0] = QuerySparkStageEnum.QUERY_ID.toString();
+        hierarchy_dims[1][1] = QuerySparkStageEnum.EXECUTION_ID.toString();
+        hierarchy_dims[1][2] = QuerySparkStageEnum.JOB_ID.toString();
+        hierarchy_dims[1][3] = QuerySparkStageEnum.STAGE_ID.toString();
         for (int i = 0; i < hierarchy_dims.length; i++) {
             hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]);
         }
@@ -447,7 +467,7 @@ public class CubeDescCreator {
         desc.setDimensions(dimensionDescList);
         desc.setMeasures(measureDescList);
         desc.setRowkey(rowKeyDesc);
-        desc.setHbaseMapping(hBaseMapping);
+        //desc.setHbaseMapping(hBaseMapping);
         desc.setNotifyList(Lists.<String> newArrayList());
         desc.setStatusNeedNotify(Lists.newArrayList(JobStatusEnum.ERROR.toString()));
         desc.setAutoMergeTimeRanges(new long[] { 86400000L, 604800000L, 2419200000L });
diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeInstanceCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeInstanceCreator.java
index 7d70bc2..e96da4d 100644
--- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeInstanceCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeInstanceCreator.java
@@ -34,7 +34,7 @@ public class CubeInstanceCreator {
     public static void main(String[] args) throws Exception {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
 
-        CubeInstance cubeInstance = generateKylinCubeInstanceForMetricsQuery("ADMIN", config, new MetricsSinkDesc());
+        CubeInstance cubeInstance = generateKylinCubeInstanceForMetricsQueryExecution("ADMIN", config, new MetricsSinkDesc());
         ByteArrayOutputStream buf = new ByteArrayOutputStream();
         DataOutputStream dout = new DataOutputStream(buf);
         CubeManager.CUBE_SERIALIZER.serialize(cubeInstance, dout);
@@ -43,21 +43,21 @@ public class CubeInstanceCreator {
         System.out.println(buf.toString("UTF-8"));
     }
 
-    public static CubeInstance generateKylinCubeInstanceForMetricsQuery(String owner, KylinConfig config,
+    public static CubeInstance generateKylinCubeInstanceForMetricsQueryExecution(String owner, KylinConfig config,
             MetricsSinkDesc sinkDesc) {
-        return generateKylinCubeInstance(owner, sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQuery()));
+        return generateKylinCubeInstance(owner, sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryExecution()));
     }
 
-    public static CubeInstance generateKylinCubeInstanceForMetricsQueryCube(String owner, KylinConfig config,
+    public static CubeInstance generateKylinCubeInstanceForMetricsQuerySparkJob(String owner, KylinConfig config,
             MetricsSinkDesc sinkDesc) {
         return generateKylinCubeInstance(owner,
-                sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryCube()));
+                sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQuerySparkJob()));
     }
 
-    public static CubeInstance generateKylinCubeInstanceForMetricsQueryRPC(String owner, KylinConfig config,
+    public static CubeInstance generateKylinCubeInstanceForMetricsQuerySparkStage(String owner, KylinConfig config,
             MetricsSinkDesc sinkDesc) {
         return generateKylinCubeInstance(owner,
-                sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryRpcCall()));
+                sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQuerySparkStage()));
     }
 
     public static CubeInstance generateKylinCubeInstanceForMetricsJob(String owner, KylinConfig config,
diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
index 35d9efb..73e493e 100644
--- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
@@ -29,10 +29,9 @@ import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
 import org.apache.kylin.metrics.lib.impl.hive.HiveProducerRecord;
 import org.apache.kylin.metrics.lib.impl.hive.HiveReservoirReporter;
 import org.apache.kylin.metrics.property.JobPropertyEnum;
-import org.apache.kylin.metrics.property.QueryCubePropertyEnum;
-import org.apache.kylin.metrics.property.QueryPropertyEnum;
-import org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
-
+import org.apache.kylin.metrics.property.QuerySparkExecutionEnum;
+import org.apache.kylin.metrics.property.QuerySparkJobEnum;
+import org.apache.kylin.metrics.property.QuerySparkStageEnum;
 import org.apache.kylin.shaded.com.google.common.base.Strings;
 import org.apache.kylin.shaded.com.google.common.collect.Lists;
 
@@ -97,18 +96,18 @@ public class HiveTableCreator {
     }
 
     public static String generateHiveTableSQLForMetricsQuery(KylinConfig config) {
-        String tableName = HiveReservoirReporter.getTableFromSubject(config.getKylinMetricsSubjectQuery());
-        return generateHiveTableSQL(tableName, getHiveColumnsForMetricsQuery(), getPartitionKVsForHiveTable());
+        String tableName = HiveReservoirReporter.getTableFromSubject(config.getKylinMetricsSubjectQueryExecution());
+        return generateHiveTableSQL(tableName, getHiveColumnsForMetricsQueryExecution(), getPartitionKVsForHiveTable());
     }
 
     public static String generateHiveTableSQLForMetricsQueryCUBE(KylinConfig config) {
-        String tableName = HiveReservoirReporter.getTableFromSubject(config.getKylinMetricsSubjectQueryCube());
-        return generateHiveTableSQL(tableName, getHiveColumnsForMetricsQueryCube(), getPartitionKVsForHiveTable());
+        String tableName = HiveReservoirReporter.getTableFromSubject(config.getKylinMetricsSubjectQuerySparkJob());
+        return generateHiveTableSQL(tableName, getHiveColumnsForMetricsQuerySparkJob(), getPartitionKVsForHiveTable());
     }
 
     public static String generateHiveTableSQLForMetricsQueryRPC(KylinConfig config) {
-        String tableName = HiveReservoirReporter.getTableFromSubject(config.getKylinMetricsSubjectQueryRpcCall());
-        return generateHiveTableSQL(tableName, getHiveColumnsForMetricsQueryRPC(), getPartitionKVsForHiveTable());
+        String tableName = HiveReservoirReporter.getTableFromSubject(config.getKylinMetricsSubjectQuerySparkStage());
+        return generateHiveTableSQL(tableName, getHiveColumnsForMetricsQuerySparkStage(), getPartitionKVsForHiveTable());
     }
 
     public static String generateHiveTableSQLForMetricsJob(KylinConfig config) {
@@ -121,67 +120,94 @@ public class HiveTableCreator {
         return generateHiveTableSQL(tableName, getHiveColumnsForMetricsJobException(), getPartitionKVsForHiveTable());
     }
 
-    public static List<Pair<String, String>> getHiveColumnsForMetricsQuery() {
+    public static List<Pair<String, String>> getHiveColumnsForMetricsQueryExecution() {
         List<Pair<String, String>> columns = Lists.newLinkedList();
-        columns.add(new Pair<>(QueryPropertyEnum.ID_CODE.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.ID_CODE.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.QUERY_ID.toString(), HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.EXECUTION_ID.toString(), HiveTypeEnum.HBIGINT.toString()));
         columns.add(new Pair<>(RecordEvent.RecordReserveKeyEnum.HOST.toString(), HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryPropertyEnum.USER.toString(), HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryPropertyEnum.PROJECT.toString(), HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryPropertyEnum.REALIZATION.toString(), HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryPropertyEnum.REALIZATION_TYPE.toString(), HiveTypeEnum.HINT.toString()));
-        columns.add(new Pair<>(QueryPropertyEnum.TYPE.toString(), HiveTypeEnum.HSTRING.toString()));
-
-        columns.add(new Pair<>(QueryPropertyEnum.EXCEPTION.toString(), HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryPropertyEnum.TIME_COST.toString(), HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryPropertyEnum.CALCITE_RETURN_COUNT.toString(), HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryPropertyEnum.STORAGE_RETURN_COUNT.toString(), HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryPropertyEnum.AGGR_FILTER_COUNT.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.USER.toString(), HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.SPARDER_NAME.toString(), HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.PROJECT.toString(), HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.REALIZATION.toString(), HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.REALIZATION_TYPE.toString(), HiveTypeEnum.HINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.CUBOID_IDS.toString(), HiveTypeEnum.HSTRING.toString()));
+
+        columns.add(new Pair<>(QuerySparkExecutionEnum.TYPE.toString(), HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.START_TIME.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.END_TIME.toString(), HiveTypeEnum.HBIGINT.toString()));
+
+        columns.add(new Pair<>(QuerySparkExecutionEnum.EXCEPTION.toString(), HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.TIME_COST.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.TOTAL_SCAN_COUNT.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.TOTAL_SCAN_BYTES.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.RESULT_COUNT.toString(), HiveTypeEnum.HBIGINT.toString()));
+
+        columns.add(new Pair<>(QuerySparkExecutionEnum.EXECUTION_DURATION.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.RESULT_SIZE.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.EXECUTOR_DESERIALIZE_TIME.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.EXECUTOR_RUN_TIME.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.EXECUTOR_CPU_TIME.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.JVM_GC_TIME.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.RESULT_SERIALIZATION_TIME.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.MEMORY_BYTE_SPILLED.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.DISK_BYTES_SPILLED.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.PEAK_EXECUTION_MEMORY.toString(), HiveTypeEnum.HBIGINT.toString()));
 
         columns.addAll(getTimeColumnsForMetrics());
         return columns;
     }
 
-    public static List<Pair<String, String>> getHiveColumnsForMetricsQueryCube() {
+    public static List<Pair<String, String>> getHiveColumnsForMetricsQuerySparkJob() {
         List<Pair<String, String>> columns = Lists.newLinkedList();
+        columns.add(new Pair<>(QuerySparkJobEnum.QUERY_ID.toString(), HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.PROJECT.toString(), HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.EXECUTION_ID.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.JOB_ID.toString(), HiveTypeEnum.HINT.toString()));
         columns.add(new Pair<>(RecordEvent.RecordReserveKeyEnum.HOST.toString(), HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.PROJECT.toString(), HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.CUBE.toString(), HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.SEGMENT.toString(), HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.CUBOID_SOURCE.toString(), HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.CUBOID_TARGET.toString(), HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.IF_MATCH.toString(), HiveTypeEnum.HBOOLEAN.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.FILTER_MASK.toString(), HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.IF_SUCCESS.toString(), HiveTypeEnum.HBOOLEAN.toString()));
-
-        columns.add(new Pair<>(QueryCubePropertyEnum.WEIGHT_PER_HIT.toString(), HiveTypeEnum.HDOUBLE.toString()));
-
-        columns.add(new Pair<>(QueryCubePropertyEnum.CALL_COUNT.toString(), HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.TIME_SUM.toString(), HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.TIME_MAX.toString(), HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.SKIP_COUNT.toString(), HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.SCAN_COUNT.toString(), HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.RETURN_COUNT.toString(), HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.AGGR_FILTER_COUNT.toString(), HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryCubePropertyEnum.AGGR_COUNT.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.START_TIME.toString(), HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.END_TIME.toString(), HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.IF_SUCCESS.toString(), HiveTypeEnum.HBOOLEAN.toString()));
+
+        columns.add(new Pair<>(QuerySparkJobEnum.RESULT_SIZE.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.EXECUTOR_DESERIALIZE_TIME.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.EXECUTOR_RUN_TIME.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.EXECUTOR_CPU_TIME.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.JVM_GC_TIME.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.RESULT_SERIALIZATION_TIME.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.MEMORY_BYTE_SPILLED.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.DISK_BYTES_SPILLED.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkJobEnum.PEAK_EXECUTION_MEMORY.toString(), HiveTypeEnum.HBIGINT.toString()));
 
         columns.addAll(getTimeColumnsForMetrics());
         return columns;
     }
 
-    public static List<Pair<String, String>> getHiveColumnsForMetricsQueryRPC() {
+    public static List<Pair<String, String>> getHiveColumnsForMetricsQuerySparkStage() {
         List<Pair<String, String>> columns = Lists.newLinkedList();
         columns.add(new Pair<>(RecordEvent.RecordReserveKeyEnum.HOST.toString(), HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryRPCPropertyEnum.PROJECT.toString(), HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryRPCPropertyEnum.REALIZATION.toString(), HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryRPCPropertyEnum.RPC_SERVER.toString(), HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QueryRPCPropertyEnum.EXCEPTION.toString(), HiveTypeEnum.HSTRING.toString()));
-
-        columns.add(new Pair<>(QueryRPCPropertyEnum.CALL_TIME.toString(), HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryRPCPropertyEnum.RETURN_COUNT.toString(), HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryRPCPropertyEnum.SCAN_COUNT.toString(), HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryRPCPropertyEnum.SKIP_COUNT.toString(), HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryRPCPropertyEnum.AGGR_FILTER_COUNT.toString(), HiveTypeEnum.HBIGINT.toString()));
-        columns.add(new Pair<>(QueryRPCPropertyEnum.AGGR_COUNT.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.QUERY_ID.toString(), HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.EXECUTION_ID.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.JOB_ID.toString(), HiveTypeEnum.HINT.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.STAGE_ID.toString(), HiveTypeEnum.HINT.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.SUBMIT_TIME.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.PROJECT.toString(), HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.REALIZATION.toString(), HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.CUBOID_ID.toString(), HiveTypeEnum.HSTRING.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.IF_SUCCESS.toString(), HiveTypeEnum.HBOOLEAN.toString()));
+
+        columns.add(new Pair<>(QuerySparkStageEnum.RESULT_SIZE.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.EXECUTOR_DESERIALIZE_TIME.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.EXECUTOR_RUN_TIME.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.EXECUTOR_CPU_TIME.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.JVM_GC_TIME.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.RESULT_SERIALIZATION_TIME.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.MEMORY_BYTE_SPILLED.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.DISK_BYTES_SPILLED.toString(), HiveTypeEnum.HBIGINT.toString()));
+        columns.add(new Pair<>(QuerySparkStageEnum.PEAK_EXECUTION_MEMORY.toString(), HiveTypeEnum.HBIGINT.toString()));
 
         columns.addAll(getTimeColumnsForMetrics());
         return columns;
diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java
index 9636811..6d70a7e 100644
--- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java
@@ -39,7 +39,7 @@ public class KylinTableCreator {
     public static void main(String[] args) throws Exception {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
 
-        TableDesc kylinTable = generateKylinTableForMetricsQuery(config, new MetricsSinkDesc());
+        TableDesc kylinTable = generateKylinTableForMetricsQueryExecution(config, new MetricsSinkDesc());
         ByteArrayOutputStream buf = new ByteArrayOutputStream();
         DataOutputStream dout = new DataOutputStream(buf);
         TableMetadataManager.TABLE_SERIALIZER.serialize(kylinTable, dout);
@@ -48,25 +48,25 @@ public class KylinTableCreator {
         System.out.println(buf.toString("UTF-8"));
     }
 
-    public static TableDesc generateKylinTableForMetricsQuery(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) {
+    public static TableDesc generateKylinTableForMetricsQueryExecution(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) {
         List<Pair<String, String>> columns = Lists.newLinkedList();
-        columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQuery());
+        columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryExecution());
         columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
-        return generateKylinTable(kylinConfig, sinkDesc, kylinConfig.getKylinMetricsSubjectQuery(), columns);
+        return generateKylinTable(kylinConfig, sinkDesc, kylinConfig.getKylinMetricsSubjectQueryExecution(), columns);
     }
 
-    public static TableDesc generateKylinTableForMetricsQueryCube(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) {
+    public static TableDesc generateKylinTableForMetricsQuerySparkJob(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) {
         List<Pair<String, String>> columns = Lists.newLinkedList();
-        columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryCube());
+        columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQuerySparkJob());
         columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
-        return generateKylinTable(kylinConfig, sinkDesc, kylinConfig.getKylinMetricsSubjectQueryCube(), columns);
+        return generateKylinTable(kylinConfig, sinkDesc, kylinConfig.getKylinMetricsSubjectQuerySparkJob(), columns);
     }
 
-    public static TableDesc generateKylinTableForMetricsQueryRPC(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) {
+    public static TableDesc generateKylinTableForMetricsQuerySparkStage(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) {
         List<Pair<String, String>> columns = Lists.newLinkedList();
-        columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryRPC());
+        columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQuerySparkStage());
         columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
-        return generateKylinTable(kylinConfig, sinkDesc, kylinConfig.getKylinMetricsSubjectQueryRpcCall(), columns);
+        return generateKylinTable(kylinConfig, sinkDesc, kylinConfig.getKylinMetricsSubjectQuerySparkStage(), columns);
     }
 
     public static TableDesc generateKylinTableForMetricsJob(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) {
diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/ModelCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/ModelCreator.java
index 429509a..8a36549 100644
--- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/ModelCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/ModelCreator.java
@@ -32,9 +32,9 @@ import org.apache.kylin.metadata.model.PartitionDesc;
 import org.apache.kylin.metrics.lib.impl.RecordEvent;
 import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
 import org.apache.kylin.metrics.property.JobPropertyEnum;
-import org.apache.kylin.metrics.property.QueryCubePropertyEnum;
-import org.apache.kylin.metrics.property.QueryPropertyEnum;
-import org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
+import org.apache.kylin.metrics.property.QuerySparkExecutionEnum;
+import org.apache.kylin.metrics.property.QuerySparkJobEnum;
+import org.apache.kylin.metrics.property.QuerySparkStageEnum;
 import org.apache.kylin.tool.metrics.systemcube.def.MetricsSinkDesc;
 
 import org.apache.kylin.shaded.com.google.common.collect.Lists;
@@ -65,23 +65,23 @@ public class ModelCreator {
 
     public static DataModelDesc generateKylinModelForMetricsQuery(String owner, KylinConfig kylinConfig,
             MetricsSinkDesc sinkDesc) {
-        String tableName = sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQuery());
-        return generateKylinModel(owner, tableName, getDimensionsForMetricsQuery(), getMeasuresForMetricsQuery(),
-                getPartitionDesc(tableName));
+        String tableName = sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQueryExecution());
+        return generateKylinModel(owner, tableName, getDimensionsForMetricsQueryExecution(),
+                getMeasuresForMetricsQueryExecution(), getPartitionDesc(tableName));
     }
 
     public static DataModelDesc generateKylinModelForMetricsQueryCube(String owner, KylinConfig kylinConfig,
             MetricsSinkDesc sinkDesc) {
-        String tableName = sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQueryCube());
-        return generateKylinModel(owner, tableName, getDimensionsForMetricsQueryCube(),
-                getMeasuresForMetricsQueryCube(), getPartitionDesc(tableName));
+        String tableName = sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQuerySparkJob());
+        return generateKylinModel(owner, tableName, getDimensionsForMetricsQuerySparkJob(),
+                getMeasuresForMetricsQuerySparkJob(), getPartitionDesc(tableName));
     }
 
     public static DataModelDesc generateKylinModelForMetricsQueryRPC(String owner, KylinConfig kylinConfig,
             MetricsSinkDesc sinkDesc) {
-        String tableName = sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQueryRpcCall());
-        return generateKylinModel(owner, tableName, getDimensionsForMetricsQueryRPC(), getMeasuresForMetricsQueryRPC(),
-                getPartitionDesc(tableName));
+        String tableName = sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQuerySparkStage());
+        return generateKylinModel(owner, tableName, getDimensionsForMetricsQuerySparkStage(),
+                getMeasuresForMetricsQuerySparkStage(), getPartitionDesc(tableName));
     }
 
     public static DataModelDesc generateKylinModelForMetricsJob(String owner, KylinConfig kylinConfig,
@@ -98,82 +98,106 @@ public class ModelCreator {
                 getMeasuresForMetricsJobException(), getPartitionDesc(tableName));
     }
 
-    public static List<String> getDimensionsForMetricsQuery() {
+    public static List<String> getDimensionsForMetricsQueryExecution() {
         List<String> result = Lists.newLinkedList();
         result.add(RecordEvent.RecordReserveKeyEnum.HOST.toString());
-        result.add(QueryPropertyEnum.USER.toString());
-        result.add(QueryPropertyEnum.PROJECT.toString());
-        result.add(QueryPropertyEnum.REALIZATION.toString());
-        result.add(QueryPropertyEnum.REALIZATION_TYPE.toString());
-        result.add(QueryPropertyEnum.TYPE.toString());
-        result.add(QueryPropertyEnum.EXCEPTION.toString());
+        result.add(QuerySparkExecutionEnum.USER.toString());
+        result.add(QuerySparkExecutionEnum.PROJECT.toString());
+        result.add(QuerySparkExecutionEnum.REALIZATION.toString());
+        result.add(QuerySparkExecutionEnum.REALIZATION_TYPE.toString());
+        result.add(QuerySparkExecutionEnum.CUBOID_IDS.toString());
+        result.add(QuerySparkExecutionEnum.TYPE.toString());
+        result.add(QuerySparkExecutionEnum.EXCEPTION.toString());
+        result.add(QuerySparkExecutionEnum.SPARDER_NAME.toString());
+        result.add(QuerySparkExecutionEnum.QUERY_ID.toString());
+        result.add(QuerySparkExecutionEnum.START_TIME.toString());
+        result.add(QuerySparkExecutionEnum.END_TIME.toString());
 
         result.addAll(getTimeDimensionsForMetrics());
         return result;
     }
 
-    public static List<String> getMeasuresForMetricsQuery() {
+    public static List<String> getMeasuresForMetricsQueryExecution() {
         List<String> result = Lists.newLinkedList();
-        result.add(QueryPropertyEnum.ID_CODE.toString());
-        result.add(QueryPropertyEnum.TIME_COST.toString());
-        result.add(QueryPropertyEnum.CALCITE_RETURN_COUNT.toString());
-        result.add(QueryPropertyEnum.STORAGE_RETURN_COUNT.toString());
-        result.add(QueryPropertyEnum.AGGR_FILTER_COUNT.toString());
-
+        result.add(QuerySparkExecutionEnum.ID_CODE.toString());
+        result.add(QuerySparkExecutionEnum.TIME_COST.toString());
+        result.add(QuerySparkExecutionEnum.TOTAL_SCAN_COUNT.toString());
+        result.add(QuerySparkExecutionEnum.TOTAL_SCAN_BYTES.toString());
+        result.add(QuerySparkExecutionEnum.RESULT_COUNT.toString());
+        result.add(QuerySparkExecutionEnum.EXECUTION_DURATION.toString());
+        result.add(QuerySparkExecutionEnum.RESULT_SIZE.toString());
+        result.add(QuerySparkExecutionEnum.EXECUTOR_DESERIALIZE_TIME.toString());
+        result.add(QuerySparkExecutionEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString());
+        result.add(QuerySparkExecutionEnum.EXECUTOR_RUN_TIME.toString());
+        result.add(QuerySparkExecutionEnum.EXECUTOR_CPU_TIME.toString());
+        result.add(QuerySparkExecutionEnum.JVM_GC_TIME.toString());
+        result.add(QuerySparkExecutionEnum.RESULT_SERIALIZATION_TIME.toString());
+        result.add(QuerySparkExecutionEnum.MEMORY_BYTE_SPILLED.toString());
+        result.add(QuerySparkExecutionEnum.DISK_BYTES_SPILLED.toString());
+        result.add(QuerySparkExecutionEnum.PEAK_EXECUTION_MEMORY.toString());
         return result;
     }
 
-    public static List<String> getDimensionsForMetricsQueryCube() {
+    public static List<String> getDimensionsForMetricsQuerySparkJob() {
         List<String> result = Lists.newLinkedList();
         result.add(RecordEvent.RecordReserveKeyEnum.HOST.toString());
-        result.add(QueryCubePropertyEnum.PROJECT.toString());
-        result.add(QueryCubePropertyEnum.CUBE.toString());
-        result.add(QueryCubePropertyEnum.SEGMENT.toString());
-        result.add(QueryCubePropertyEnum.CUBOID_SOURCE.toString());
-        result.add(QueryCubePropertyEnum.CUBOID_TARGET.toString());
-        result.add(QueryCubePropertyEnum.FILTER_MASK.toString());
-        result.add(QueryCubePropertyEnum.IF_MATCH.toString());
-        result.add(QueryCubePropertyEnum.IF_SUCCESS.toString());
+        result.add(QuerySparkJobEnum.QUERY_ID.toString());
+        result.add(QuerySparkJobEnum.EXECUTION_ID.toString());
+        result.add(QuerySparkJobEnum.JOB_ID.toString());
+        result.add(QuerySparkJobEnum.PROJECT.toString());
+        result.add(QuerySparkJobEnum.START_TIME.toString());
+        result.add(QuerySparkJobEnum.END_TIME.toString());
+        result.add(QuerySparkJobEnum.IF_SUCCESS.toString());
 
         result.addAll(getTimeDimensionsForMetrics());
         return result;
     }
 
-    public static List<String> getMeasuresForMetricsQueryCube() {
+    public static List<String> getMeasuresForMetricsQuerySparkJob() {
         List<String> result = Lists.newLinkedList();
-        result.add(QueryCubePropertyEnum.WEIGHT_PER_HIT.toString());
-        result.add(QueryCubePropertyEnum.CALL_COUNT.toString());
-        result.add(QueryCubePropertyEnum.TIME_SUM.toString());
-        result.add(QueryCubePropertyEnum.TIME_MAX.toString());
-        result.add(QueryCubePropertyEnum.SKIP_COUNT.toString());
-        result.add(QueryCubePropertyEnum.SCAN_COUNT.toString());
-        result.add(QueryCubePropertyEnum.RETURN_COUNT.toString());
-        result.add(QueryCubePropertyEnum.AGGR_FILTER_COUNT.toString());
-        result.add(QueryCubePropertyEnum.AGGR_COUNT.toString());
+        result.add(QuerySparkJobEnum.RESULT_SIZE.toString());
+        result.add(QuerySparkJobEnum.EXECUTOR_DESERIALIZE_TIME.toString());
+        result.add(QuerySparkJobEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString());
+        result.add(QuerySparkJobEnum.EXECUTOR_RUN_TIME.toString());
+        result.add(QuerySparkJobEnum.EXECUTOR_CPU_TIME.toString());
+        result.add(QuerySparkJobEnum.JVM_GC_TIME.toString());
+        result.add(QuerySparkJobEnum.RESULT_SERIALIZATION_TIME.toString());
+        result.add(QuerySparkJobEnum.MEMORY_BYTE_SPILLED.toString());
+        result.add(QuerySparkJobEnum.DISK_BYTES_SPILLED.toString());
+        result.add(QuerySparkJobEnum.PEAK_EXECUTION_MEMORY.toString());
 
         return result;
     }
 
-    public static List<String> getDimensionsForMetricsQueryRPC() {
+    public static List<String> getDimensionsForMetricsQuerySparkStage() {
         List<String> result = Lists.newLinkedList();
         result.add(RecordEvent.RecordReserveKeyEnum.HOST.toString());
-        result.add(QueryRPCPropertyEnum.PROJECT.toString());
-        result.add(QueryRPCPropertyEnum.REALIZATION.toString());
-        result.add(QueryRPCPropertyEnum.RPC_SERVER.toString());
-        result.add(QueryRPCPropertyEnum.EXCEPTION.toString());
+        result.add(QuerySparkStageEnum.QUERY_ID.toString());
+        result.add(QuerySparkStageEnum.EXECUTION_ID.toString());
+        result.add(QuerySparkStageEnum.JOB_ID.toString());
+        result.add(QuerySparkStageEnum.STAGE_ID.toString());
+        result.add(QuerySparkStageEnum.SUBMIT_TIME.toString());
+        result.add(QuerySparkStageEnum.PROJECT.toString());
+        result.add(QuerySparkStageEnum.REALIZATION.toString());
+        result.add(QuerySparkStageEnum.CUBOID_ID.toString());
+        result.add(QuerySparkStageEnum.IF_SUCCESS.toString());
 
         result.addAll(getTimeDimensionsForMetrics());
         return result;
     }
 
-    public static List<String> getMeasuresForMetricsQueryRPC() {
+    public static List<String> getMeasuresForMetricsQuerySparkStage() {
         List<String> result = Lists.newLinkedList();
-        result.add(QueryRPCPropertyEnum.CALL_TIME.toString());
-        result.add(QueryRPCPropertyEnum.RETURN_COUNT.toString());
-        result.add(QueryRPCPropertyEnum.SCAN_COUNT.toString());
-        result.add(QueryRPCPropertyEnum.SKIP_COUNT.toString());
-        result.add(QueryRPCPropertyEnum.AGGR_FILTER_COUNT.toString());
-        result.add(QueryRPCPropertyEnum.AGGR_COUNT.toString());
+        result.add(QuerySparkStageEnum.RESULT_SIZE.toString());
+        result.add(QuerySparkStageEnum.EXECUTOR_DESERIALIZE_TIME.toString());
+        result.add(QuerySparkStageEnum.EXECUTOR_DESERIALIZE_CPU_TIME.toString());
+        result.add(QuerySparkStageEnum.EXECUTOR_RUN_TIME.toString());
+        result.add(QuerySparkStageEnum.EXECUTOR_CPU_TIME.toString());
+        result.add(QuerySparkStageEnum.JVM_GC_TIME.toString());
+        result.add(QuerySparkStageEnum.RESULT_SERIALIZATION_TIME.toString());
+        result.add(QuerySparkStageEnum.MEMORY_BYTE_SPILLED.toString());
+        result.add(QuerySparkStageEnum.DISK_BYTES_SPILLED.toString());
+        result.add(QuerySparkStageEnum.PEAK_EXECUTION_MEMORY.toString());
 
         return result;
     }
diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java
index 17fde95..3322677 100644
--- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java
@@ -186,9 +186,9 @@ public class SCCreator extends AbstractApplication {
 
     private List<TableDesc> generateKylinTableForSystemCube(MetricsSinkDesc sinkDesc) {
         List<TableDesc> result = Lists.newLinkedList();
-        result.add(KylinTableCreator.generateKylinTableForMetricsQuery(config, sinkDesc));
-        result.add(KylinTableCreator.generateKylinTableForMetricsQueryCube(config, sinkDesc));
-        result.add(KylinTableCreator.generateKylinTableForMetricsQueryRPC(config, sinkDesc));
+        result.add(KylinTableCreator.generateKylinTableForMetricsQueryExecution(config, sinkDesc));
+        result.add(KylinTableCreator.generateKylinTableForMetricsQuerySparkJob(config, sinkDesc));
+        result.add(KylinTableCreator.generateKylinTableForMetricsQuerySparkStage(config, sinkDesc));
         result.add(KylinTableCreator.generateKylinTableForMetricsJob(config, sinkDesc));
         result.add(KylinTableCreator.generateKylinTableForMetricsJobException(config, sinkDesc));
 
@@ -208,9 +208,9 @@ public class SCCreator extends AbstractApplication {
 
     private List<CubeDesc> generateKylinCubeDescForSystemCube(MetricsSinkDesc sinkDesc) {
         List<CubeDesc> result = Lists.newLinkedList();
-        result.add(CubeDescCreator.generateKylinCubeDescForMetricsQuery(config, sinkDesc));
-        result.add(CubeDescCreator.generateKylinCubeDescForMetricsQueryCube(config, sinkDesc));
-        result.add(CubeDescCreator.generateKylinCubeDescForMetricsQueryRPC(config, sinkDesc));
+        result.add(CubeDescCreator.generateKylinCubeDescForMetricsQueryExecution(config, sinkDesc));
+        result.add(CubeDescCreator.generateKylinCubeDescForMetricsQuerySparkJob(config, sinkDesc));
+        result.add(CubeDescCreator.generateKylinCubeDescForMetricsQuerySparkStage(config, sinkDesc));
         result.add(CubeDescCreator.generateKylinCubeDescForMetricsJob(config, sinkDesc));
         result.add(CubeDescCreator.generateKylinCubeDescForMetricsJobException(config, sinkDesc));
 
@@ -219,9 +219,9 @@ public class SCCreator extends AbstractApplication {
 
     private List<CubeInstance> generateKylinCubeInstanceForSystemCube(String owner, MetricsSinkDesc sinkDesc) {
         List<CubeInstance> result = Lists.newLinkedList();
-        result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQuery(owner, config, sinkDesc));
-        result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQueryCube(owner, config, sinkDesc));
-        result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQueryRPC(owner, config, sinkDesc));
+        result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQueryExecution(owner, config, sinkDesc));
+        result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQuerySparkJob(owner, config, sinkDesc));
+        result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQuerySparkStage(owner, config, sinkDesc));
         result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsJob(owner, config, sinkDesc));
         result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsJobException(owner, config, sinkDesc));
 
diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/KafkaTopicCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/KafkaTopicCreator.java
index c495919..50ab091 100644
--- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/KafkaTopicCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/KafkaTopicCreator.java
@@ -25,9 +25,9 @@ public class KafkaTopicCreator {
     public static String generateCreateCommand(KylinConfig config) {
         StringBuilder sb = new StringBuilder();
         String[] topics = new String[]{
-                config.getKylinMetricsSubjectQuery(),
-                config.getKylinMetricsSubjectQueryCube(),
-                config.getKylinMetricsSubjectQueryRpcCall(),
+                config.getKylinMetricsSubjectQueryExecution(),
+                config.getKylinMetricsSubjectQuerySparkJob(),
+                config.getKylinMetricsSubjectQuerySparkStage(),
                 config.getKylinMetricsSubjectJob(),
                 config.getKylinMetricsSubjectJobException()};
         for (String topic : topics) {


[kylin] 06/07: KYLIN-4653 Make the capacity for the LinkedBlockingQueue of BlockingReservoir configurable

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 5b3859c9d6871cd001a39ece43c856275215e01b
Author: XiaoxiangYu <hi...@126.com>
AuthorDate: Tue Jul 28 21:40:13 2020 +0800

    KYLIN-4653 Make the capacity for the LinkedBlockingQueue of BlockingReservoir configurable
    
    (cherry picked from commit 159a0fffe0aff2babd7d6f97bab7de6c7dc2be35)
---
 .github/workflows/maven.yml                                    | 10 ++++++----
 .../org/apache/kylin/metrics/lib/impl/BlockingReservoir.java   |  8 ++++----
 server/src/main/resources/kylinMetrics.xml                     |  5 +++--
 3 files changed, 13 insertions(+), 10 deletions(-)

diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index b2e447c..77ff0a9 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -1,13 +1,12 @@
-# Configuration file for Travis continuous integration.
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
-# The ASF licenses this file to you under the Apache License, Version 2.0
+# The ASF licenses this file to You under the Apache License, Version 2.0
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
 #
-# http://www.apache.org/licenses/LICENSE-2.0
+#    http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,6 +15,9 @@
 # limitations under the License.
 #
 
+# This workflow will build a Java project with Maven
+# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-maven
+
 name: Java CI with Maven
 
 on:
@@ -36,4 +38,4 @@ jobs:
       with:
         java-version: 1.8
     - name: Build with Maven
-      run: mvn clean -Dpre-commit apache-rat:check test -Dlicense.skip=false
+      run: mvn -B package --file pom.xml
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
index afa34a9..7b4d07c 100644
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
@@ -39,7 +39,7 @@ import org.apache.kylin.shaded.com.google.common.util.concurrent.ThreadFactoryBu
 public class BlockingReservoir extends AbstractActiveReservoir {
 
     private static final Logger logger = LoggerFactory.getLogger(BlockingReservoir.class);
-    private static final int MAX_QUEUE_SIZE = 50000;
+    private static final int MAX_QUEUE_SIZE = 500000;
 
     /**
      * Cache for metrics message with max size is maxReportSize
@@ -60,7 +60,7 @@ public class BlockingReservoir extends AbstractActiveReservoir {
     }
 
     public BlockingReservoir(int minReportSize, int maxReportSize, int maxReportTime) {
-        this(minReportSize, maxReportSize, maxReportSize, MAX_QUEUE_SIZE);
+        this(minReportSize, maxReportSize, maxReportTime, MAX_QUEUE_SIZE);
     }
 
     public BlockingReservoir(int minReportSize, int maxReportSize, int maxReportTime, int maxQueueSize) {
@@ -68,11 +68,11 @@ public class BlockingReservoir extends AbstractActiveReservoir {
         Preconditions.checkArgument(maxReportSize >= minReportSize,
                 "maxReportSize should not be less than minBatchSize");
         Preconditions.checkArgument(maxReportTime > 0, "maxReportTime should be larger than 0");
-        this.minReportSize = minReportSize;
         this.maxReportSize = maxReportSize;
         this.maxReportTime = maxReportTime * 60 * 1000L;
 
-        this.recordsQueue = new LinkedBlockingQueue<>(maxQueueSize);
+        this.recordsQueue = maxQueueSize <= 0 ? new LinkedBlockingQueue<>() : new LinkedBlockingQueue<>(maxQueueSize);
+        this.minReportSize = minReportSize;
         this.listeners = Lists.newArrayList();
 
         this.records = Lists.newArrayListWithExpectedSize(this.maxReportSize);
diff --git a/server/src/main/resources/kylinMetrics.xml b/server/src/main/resources/kylinMetrics.xml
index a9d907a..dea04b2 100644
--- a/server/src/main/resources/kylinMetrics.xml
+++ b/server/src/main/resources/kylinMetrics.xml
@@ -39,9 +39,10 @@
             <value>10</value>
         </constructor-arg>
 
-        <!-- maxQueueSize, max queue size of LinkedBlockingQueue-->
+        <!-- maxQueueSize, max queue size of records in BlockingReservoir;
+            set zero or a negative number if you prefer a unbounded LinkedBlockingQueue -->
         <constructor-arg index="3">
-            <value>50000</value>
+            <value>500000</value>
         </constructor-arg>
     </bean>
 


[kylin] 02/07: add test case

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit ad7b47245c959ea3e57efe1c8f5a737d0b1653ed
Author: Zhichao Zhang <44...@qq.com>
AuthorDate: Thu Jan 14 14:51:59 2021 +0800

    add test case
    
    Add metrics check
---
 build/bin/system-cube.sh                           |  20 +-
 .../org/apache/kylin/common/KylinConfigBase.java   |   2 +-
 .../apache/kylin/metrics/QuerySparkMetrics.java    |  98 ++++----
 examples/test_case_data/localmeta/kylin.properties |   5 +-
 .../spark/sql/metrics/SparderMetricsListener.scala |   2 +-
 .../apache/kylin/rest/init/InitialTaskManager.java |   2 +
 .../kylin/rest/metrics/QueryMetricsFacade.java     |   4 +-
 .../apache/kylin/rest/response/SQLResponse.java    |  10 +
 .../kylin/rest/service/DashboardService.java       |   1 -
 .../apache/kylin/rest/service/QueryService.java    |  12 +-
 .../kylin/rest/metrics/QueryMetricsTest.java       | 271 ++++++++++++++++++++-
 .../tool/metrics/systemcube/CubeDescCreator.java   |   2 +-
 .../tool/metrics/systemcube/HiveTableCreator.java  |   2 +-
 13 files changed, 363 insertions(+), 68 deletions(-)

diff --git a/build/bin/system-cube.sh b/build/bin/system-cube.sh
index 20f7861..ca35970 100644
--- a/build/bin/system-cube.sh
+++ b/build/bin/system-cube.sh
@@ -74,18 +74,14 @@ then
 	cat <<-EOF > ${SINK_TOOLS_FILE}
 	[
 	  [
-		"org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool",
-		{
-		  "storage_type": 2,
-		  "cube_desc_override_properties": [
-			"java.util.HashMap",
-			{
-			  "kylin.cube.algorithm": "INMEM",
-			  "kylin.cube.max-building-segments": "1"
-			}
-		  ]
-		}
-	  ]
+    {
+       "sink": "hive",
+       "storage_type": 4,
+       "cube_desc_override_properties": {
+         "kylin.cube.max-building-segments": "1"
+       }
+    }
+    ]
 	]
 	EOF
   $KYLIN_HOME/bin/kylin.sh org.apache.kylin.tool.metrics.systemcube.SCCreator \
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 36950ec..21f05db 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2364,7 +2364,7 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     public int getKylinMetricsCacheExpireSeconds() {
-        return Integer.parseInt(this.getOptional("kylin.metrics.query-cache.expire-seconds", "600"));
+        return Integer.parseInt(this.getOptional("kylin.metrics.query-cache.expire-seconds", "300"));
     }
 
     public int getKylinMetricsCacheMaxEntries() {
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java b/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java
index ed2430c..a0efe64 100644
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java
@@ -36,42 +36,62 @@ import java.io.Serializable;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 public class QuerySparkMetrics {
     private static final Logger logger = LoggerFactory.getLogger(QuerySparkMetrics.class);
-    private static final QuerySparkMetrics instance = new QuerySparkMetrics();
+    private static ScheduledExecutorService scheduledExecutor = null;
+    private static QuerySparkMetrics instance =
+            new QuerySparkMetrics(new QuerySparkMetricsRemovalListener());
     private static final int sparkMetricsNum = 10;
     private org.apache.kylin.shaded.com.google.common.cache.Cache<String, QueryExecutionMetrics> queryExecutionMetricsMap;
 
-    private QuerySparkMetrics() {
+    // default removal listener
+    private static class QuerySparkMetricsRemovalListener implements RemovalListener<String,
+            QueryExecutionMetrics> {
+        @Override
+        public void onRemoval(RemovalNotification<String, QueryExecutionMetrics> notification) {
+            try {
+                updateMetricsToReservoir(notification.getKey(), notification.getValue());
+                logger.info("Query metrics {} is removed due to {}, update to metrics reservoir successful",
+                        notification.getKey(), notification.getCause());
+            } catch (Exception e) {
+                logger.warn("Query metrics {} is removed due to {}, update to metrics reservoir failed",
+                        notification.getKey(), notification.getCause());
+            }
+        }
+    }
+
+    private QuerySparkMetrics(RemovalListener removalListener) {
+        if (queryExecutionMetricsMap != null) {
+            queryExecutionMetricsMap.cleanUp();
+            queryExecutionMetricsMap = null;
+        }
         queryExecutionMetricsMap = CacheBuilder.newBuilder()
                 .maximumSize(KylinConfig.getInstanceFromEnv().getKylinMetricsCacheMaxEntries())
                 .expireAfterWrite(KylinConfig.getInstanceFromEnv().getKylinMetricsCacheExpireSeconds(),
                         TimeUnit.SECONDS)
-                .removalListener(new RemovalListener<String, QueryExecutionMetrics>() {
-                    @Override
-                    public void onRemoval(RemovalNotification<String, QueryExecutionMetrics> notification) {
-                        try {
-                            updateMetricsToReservoir(notification.getKey(), notification.getValue());
-                            logger.info("Query metrics {} is removed due to {}, update to metrics reservoir successful",
-                                    notification.getKey(), notification.getCause());
-                        } catch(Exception e) {
-                            logger.warn("Query metrics {} is removed due to {}, update to metrics reservoir failed",
-                                    notification.getKey(), notification.getCause());
-                        }
-                    }
-                }).build();
-
-        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
-                                                                                @Override
-                                                                                public void run() {
-                                                                                    queryExecutionMetricsMap.cleanUp();
-                                                                                }
-                                                                            },
+                .removalListener(removalListener).build();
+
+        if (scheduledExecutor != null && !scheduledExecutor.isShutdown()) {
+            scheduledExecutor.shutdown();
+            scheduledExecutor = null;
+        }
+        scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+        scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
+                                                     @Override
+                                                     public void run() {
+                                                         queryExecutionMetricsMap.cleanUp();
+                                                     }
+                                                 },
                 KylinConfig.getInstanceFromEnv().getKylinMetricsCacheExpireSeconds(),
                 KylinConfig.getInstanceFromEnv().getKylinMetricsCacheExpireSeconds(), TimeUnit.SECONDS);
+    }
 
+    // only for test case
+    public static void init(RemovalListener removalListener) {
+        instance = new QuerySparkMetrics(removalListener);
     }
 
     public static QuerySparkMetrics getInstance() {
@@ -159,19 +179,11 @@ public class QuerySparkMetrics {
         return queryExecutionMetricsMap.getIfPresent(queryId);
     }
 
-    public void setQueryRealization(String queryId, String realizationName, int realizationType, String cuboidIds) {
-        QueryExecutionMetrics queryExecutionMetrics = queryExecutionMetricsMap.getIfPresent(queryId);
-        if (queryExecutionMetrics != null) {
-            queryExecutionMetrics.setRealization(realizationName);
-            queryExecutionMetrics.setRealizationType(realizationType);
-            queryExecutionMetrics.setCuboidIds(cuboidIds);
-        }
-    }
-
     /**
      * report query related metrics
      */
-    public void updateMetricsToReservoir(String queryId, QueryExecutionMetrics queryExecutionMetrics) {
+    public static void updateMetricsToReservoir(String queryId,
+                                                QueryExecutionMetrics queryExecutionMetrics) {
         if (!KylinConfig.getInstanceFromEnv().isKylinMetricsReporterForQueryEnabled()) {
             return;
         }
@@ -186,7 +198,8 @@ public class QuerySparkMetrics {
 
             setSparkExecutionWrapper(queryExecutionMetricsEvent, queryExecutionMetrics.getSparderName(),
                     queryExecutionMetrics.getExecutionId(), queryExecutionMetrics.getRealization(),
-                    queryExecutionMetrics.getRealizationType(), queryExecutionMetrics.getCuboidIds(),
+                    queryExecutionMetrics.getRealizationTypes(),
+                    queryExecutionMetrics.getCuboidIds(),
                     queryExecutionMetrics.getStartTime(), queryExecutionMetrics.getEndTime());
 
             setQueryMetrics(queryExecutionMetricsEvent, queryExecutionMetrics.getSqlDuration(),
@@ -244,7 +257,8 @@ public class QuerySparkMetrics {
                     queryExecutionMetricsList[i] += sparkJobMetricsList[i];
                 }
             }
-            setSparkExecutionMetrics(queryExecutionMetricsEvent, queryExecutionMetrics.getExecutionDuration(),
+            setSparkExecutionMetrics(queryExecutionMetricsEvent,
+                    queryExecutionMetrics.getEndTime() - queryExecutionMetrics.getStartTime(),
                     queryExecutionMetricsList[0], queryExecutionMetricsList[1], queryExecutionMetricsList[2],
                     queryExecutionMetricsList[3], queryExecutionMetricsList[4], queryExecutionMetricsList[5],
                     queryExecutionMetricsList[6], queryExecutionMetricsList[7], queryExecutionMetricsList[8],
@@ -264,8 +278,10 @@ public class QuerySparkMetrics {
         metricsEvent.put(QuerySparkExecutionEnum.EXCEPTION.toString(), exception);
     }
 
-    private static void setSparkExecutionWrapper(RecordEvent metricsEvent, String sparderName, long executionId,
-            String realizationName, int realizationType, String cuboidIds, long startTime, long endTime) {
+    private static void setSparkExecutionWrapper(RecordEvent metricsEvent, String sparderName,
+                                                 long executionId, String realizationName,
+                                                 String realizationType, String cuboidIds,
+                                                 long startTime, long endTime) {
         metricsEvent.put(QuerySparkExecutionEnum.SPARDER_NAME.toString(), sparderName);
         metricsEvent.put(QuerySparkExecutionEnum.EXECUTION_ID.toString(), executionId);
         metricsEvent.put(QuerySparkExecutionEnum.REALIZATION.toString(), realizationName);
@@ -365,7 +381,7 @@ public class QuerySparkMetrics {
         private long executionDuration;
         private String queryId;
         private String realization;
-        private int realizationType;
+        private String realizationTypes;
         private String cuboidIds;
         private long startTime;
         private long endTime;
@@ -512,16 +528,16 @@ public class QuerySparkMetrics {
             return realization;
         }
 
-        public int getRealizationType() {
-            return realizationType;
+        public String getRealizationTypes() {
+            return realizationTypes;
         }
 
         public void setRealization(String realization) {
             this.realization = realization;
         }
 
-        public void setRealizationType(int realizationType) {
-            this.realizationType = realizationType;
+        public void setRealizationTypes(String realizationTypes) {
+            this.realizationTypes = realizationTypes;
         }
 
         public void setSparkJobMetricsMap(ConcurrentMap<Integer, SparkJobMetrics> sparkJobMetricsMap) {
diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties
index ca9543e..d17ce8c 100644
--- a/examples/test_case_data/localmeta/kylin.properties
+++ b/examples/test_case_data/localmeta/kylin.properties
@@ -159,4 +159,7 @@ kylin.source.jdbc.connection-url=jdbc:h2:mem:db
 kylin.source.jdbc.user=
 kylin.source.jdbc.pass=
 
-kylin.query.auto-sparder-context=false
\ No newline at end of file
+kylin.query.auto-sparder-context=false
+
+kylin.metrics.query-cache.expire-seconds=5
+kylin.metrics.query-cache.max-entries=2
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala
index 6237235..84097e6 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala
@@ -98,7 +98,7 @@ class SparderMetricsListener() extends SparkListener with Logging {
       val stageMetrics = stageInfo.taskMetrics
       val sparkStageMetrics = new QuerySparkMetrics.SparkStageMetrics
       sparkStageMetrics.setMetrics(stageMetrics.resultSize, stageMetrics.executorDeserializeCpuTime,
-        stageMetrics.executorDeserializeCpuTime, stageMetrics.executorRunTime, stageMetrics.executorCpuTime,
+        stageMetrics.executorDeserializeTime, stageMetrics.executorRunTime, stageMetrics.executorCpuTime,
         stageMetrics.jvmGCTime, stageMetrics.resultSerializationTime,
         stageMetrics.memoryBytesSpilled, stageMetrics.diskBytesSpilled, stageMetrics.peakExecutionMemory)
       queryExecutionMetrics.updateSparkStageMetrics(jobExecutionMap.apply(stageJobMap.apply(stageInfo.stageId)).queryId,
diff --git a/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java b/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
index 876ae08..feaf3a3 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
@@ -21,6 +21,7 @@ package org.apache.kylin.rest.init;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.metrics.QuerySparkMetrics;
 import org.apache.kylin.rest.metrics.QueryMetrics2Facade;
 import org.apache.kylin.rest.metrics.QueryMetricsFacade;
 import org.slf4j.Logger;
@@ -44,6 +45,7 @@ public class InitialTaskManager implements InitializingBean {
     private void runInitialTasks() {
 
         // init metrics system for kylin
+        QuerySparkMetrics.getInstance();
         QueryMetricsFacade.init();
         QueryMetrics2Facade.init();
 
diff --git a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
index 7a8f2d7..db01f33 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
@@ -93,7 +93,9 @@ public class QueryMetricsFacade {
             queryExecutionMetrics.setSqlIdCode(getSqlHashCode(sqlRequest.getSql()));
             queryExecutionMetrics.setProject(norm(sqlRequest.getProject()));
             queryExecutionMetrics.setQueryType(sqlResponse.isStorageCacheUsed() ? "CACHE" : "PARQUET");
-
+            queryExecutionMetrics.setRealization(sqlResponse.getCube());
+            queryExecutionMetrics.setRealizationTypes(sqlResponse.getRealizationTypes());
+            queryExecutionMetrics.setCuboidIds(sqlResponse.getCuboidIds());
             queryExecutionMetrics.setSqlDuration(sqlResponse.getDuration());
             queryExecutionMetrics.setTotalScanCount(sqlResponse.getTotalScanCount());
             queryExecutionMetrics.setTotalScanBytes(sqlResponse.getTotalScanBytes());
diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
index d5d57ed..37578ec 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
@@ -48,6 +48,8 @@ public class SQLResponse implements Serializable {
 
     protected String cuboidIds;
 
+    protected String realizationTypes;
+
     // if not select query, only return affected row count
     protected int affectedRowCount;
 
@@ -286,6 +288,14 @@ public class SQLResponse implements Serializable {
         this.lazyQueryStartTime = lazyQueryStartTime;
     }
 
+    public String getRealizationTypes() {
+        return realizationTypes;
+    }
+
+    public void setRealizationTypes(String realizationTypes) {
+        this.realizationTypes = realizationTypes;
+    }
+
     @JsonIgnore
     public List<QueryContext.CubeSegmentStatisticsResult> getCubeSegmentStatisticsList() {
         try {
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java b/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java
index 768876f..392fac2 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java
@@ -32,7 +32,6 @@ import org.apache.kylin.metrics.MetricsManager;
 import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
 import org.apache.kylin.metrics.property.JobPropertyEnum;
 import org.apache.kylin.metrics.property.QuerySparkExecutionEnum;
-import org.apache.kylin.metrics.property.QuerySparkExecutionEnum;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.BadRequestException;
 import org.apache.kylin.rest.request.PrepareSqlRequest;
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index f020d6b..7296636 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -1180,6 +1180,7 @@ public class QueryService extends BasicService {
         List<String> realizations = Lists.newLinkedList();
         StringBuilder cubeSb = new StringBuilder();
         StringBuilder cuboidIdsSb = new StringBuilder();
+        StringBuilder realizationTypeSb = new StringBuilder();
         StringBuilder logSb = new StringBuilder("Processed rows for each storageContext: ");
         QueryContext queryContext = QueryContextFacade.current();
         if (OLAPContext.getThreadLocalContexts() != null) { // contexts can be null in case of 'explain plan for'
@@ -1191,6 +1192,7 @@ public class QueryService extends BasicService {
                     if (cubeSb.length() > 0) {
                         cubeSb.append(",");
                     }
+                    cubeSb.append(ctx.realization.getCanonicalName());
                     Cuboid cuboid = ctx.storageContext.getCuboid();
                     if (cuboid != null) {
                         //Some queries do not involve cuboid, e.g. lookup table query
@@ -1199,25 +1201,25 @@ public class QueryService extends BasicService {
                         }
                         cuboidIdsSb.append(cuboid.getId());
                     }
-                    cubeSb.append(ctx.realization.getCanonicalName());
                     logSb.append(ctx.storageContext.getProcessedRowCount()).append(" ");
 
                     realizationName = ctx.realization.getName();
                     realizationType = ctx.realization.getStorageType();
+                    if (realizationTypeSb.length() > 0) {
+                        realizationTypeSb.append(",");
+                    }
+                    realizationTypeSb.append(realizationType);
 
                     realizations.add(realizationName);
                 }
-                QuerySparkMetrics.getInstance().setQueryRealization(queryContext.getQueryId(), realizationName,
-                        realizationType, cuboidIdsSb.toString());
             }
-
-
         }
         logger.info(logSb.toString());
 
         SQLResponse response = new SQLResponse(columnMetas, results, cubeSb.toString(), 0, isException,
                 exceptionMessage, isPartialResult, isPushDown);
         response.setCuboidIds(cuboidIdsSb.toString());
+        response.setRealizationTypes(realizationTypeSb.toString());
         response.setTotalScanCount(queryContext.getScannedRows());
         response.setTotalScanFiles((queryContext.getScanFiles() < 0) ? -1 :
                 queryContext.getScanFiles());
diff --git a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
index fbacc78..3ad7eea 100644
--- a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
@@ -21,23 +21,31 @@ package org.apache.kylin.rest.metrics;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.QueryContextFacade;
+import org.apache.kylin.metrics.QuerySparkMetrics;
 import org.apache.kylin.rest.request.SQLRequest;
 import org.apache.kylin.rest.response.SQLResponse;
 import org.apache.kylin.rest.service.ServiceTestBase;
+import org.apache.kylin.shaded.com.google.common.cache.RemovalListener;
+import org.apache.kylin.shaded.com.google.common.cache.RemovalNotification;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 
 public class QueryMetricsTest extends ServiceTestBase {
 
     private static MBeanServer mBeanServer;
     private static ObjectName objectName;
+    private static AtomicInteger sparkMetricsReportCnt = new AtomicInteger(0);
 
-//    @Before
+    @Before
     public void setup() throws Exception {
         super.setup();
 
@@ -45,7 +53,7 @@ public class QueryMetricsTest extends ServiceTestBase {
         objectName = new ObjectName("Hadoop:service=Kylin,name=Server_Total");
     }
 
-//    @Test
+    @Test
     public void testQueryMetrics() throws Exception {
         System.setProperty("kylin.server.query-metrics-enabled", "true");
         QueryMetricsFacade.init();
@@ -111,7 +119,7 @@ public class QueryMetricsTest extends ServiceTestBase {
         System.clearProperty("kylin.server.query-metrics-enabled");
     }
 
-//    @Test
+    @Test
     public void testQueryStatisticsResult() throws Exception {
         System.setProperty("kylin.metrics.reporter-query-enabled", "true");
         QueryMetricsFacade.init();
@@ -153,4 +161,261 @@ public class QueryMetricsTest extends ServiceTestBase {
         System.clearProperty("kylin.server.query-metrics-enabled");
         System.out.println("------------testQueryStatisticsResult done------------");
     }
+
+    @Test
+    public void testQuerySparkMetrics() throws Exception {
+        sparkMetricsReportCnt.set(0);
+        System.setProperty("kylin.server.query-metrics-enabled", "true");
+        QuerySparkMetrics.init(new QuerySparkMetricsTestRemovalListener());
+        QueryMetricsFacade.init();
+
+        SQLRequest sqlRequest = new SQLRequest();
+        sqlRequest.setSql("select * from TEST_KYLIN_FACT");
+        sqlRequest.setProject("default");
+
+        String queryId1 = "1";
+        generateSparkMetrics(queryId1);
+
+        SQLResponse sqlResponse = new SQLResponse();
+        sqlResponse.setDuration(10);
+        sqlResponse.setCube("test_cube");
+        sqlResponse.setCuboidIds("12345");
+        sqlResponse.setRealizationTypes("4");
+        sqlResponse.setIsException(false);
+        sqlResponse.setTotalScanCount(100);
+        List<String> list1 = new ArrayList<>();
+        list1.add("111");
+        list1.add("112");
+        List<String> list2 = new ArrayList<>();
+        list2.add("111");
+        list2.add("112");
+        List<List<String>> results = new ArrayList<>();
+        results.add(list1);
+        results.add(list2);
+        sqlResponse.setResults(results);
+        sqlResponse.setStorageCacheUsed(true);
+
+        QueryMetricsFacade.updateMetrics(queryId1, sqlRequest, sqlResponse);
+
+        Thread.sleep(3000);
+
+        updateSparkMetrics(queryId1);
+
+        Assert.assertTrue(QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryId1) != null);
+        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QueryCount"));
+        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QuerySuccessCount"));
+        Assert.assertEquals(0L, mBeanServer.getAttribute(objectName, "QueryFailCount"));
+        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "CacheHitCount"));
+
+        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "ScanRowCountNumOps"));
+        Assert.assertEquals(100.0, mBeanServer.getAttribute(objectName, "ScanRowCountAvgTime"));
+        Assert.assertEquals(100.0, mBeanServer.getAttribute(objectName, "ScanRowCountMaxTime"));
+        Assert.assertEquals(100.0, mBeanServer.getAttribute(objectName, "ScanRowCountMinTime"));
+
+        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "ResultRowCountNumOps"));
+        Assert.assertEquals(2.0, mBeanServer.getAttribute(objectName, "ResultRowCountMaxTime"));
+        Assert.assertEquals(2.0, mBeanServer.getAttribute(objectName, "ResultRowCountAvgTime"));
+        Assert.assertEquals(2.0, mBeanServer.getAttribute(objectName, "ResultRowCountMinTime"));
+
+        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QueryLatencyNumOps"));
+        Assert.assertEquals(10.0, mBeanServer.getAttribute(objectName, "QueryLatencyMaxTime"));
+        Assert.assertEquals(10.0, mBeanServer.getAttribute(objectName, "QueryLatencyAvgTime"));
+        Assert.assertEquals(10.0, mBeanServer.getAttribute(objectName, "QueryLatencyMinTime"));
+
+        String queryId2 = "2";
+        generateSparkMetrics(queryId2);
+
+        Thread.sleep(3000);
+
+        Assert.assertTrue(QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryId1) == null);
+        Assert.assertTrue(QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryId2) != null);
+
+        updateSparkMetrics(queryId2);
+
+        SQLResponse sqlResponse2 = new SQLResponse();
+        sqlResponse2.setDuration(10);
+        sqlResponse2.setCube("test_cube");
+        sqlResponse2.setIsException(true);
+
+        QueryMetricsFacade.updateMetrics(queryId2, sqlRequest, sqlResponse2);
+
+        Thread.sleep(5000);
+
+        Assert.assertEquals(2L, mBeanServer.getAttribute(objectName, "QueryCount"));
+        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QuerySuccessCount"));
+        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QueryFailCount"));
+
+        Assert.assertTrue(QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryId2) == null);
+        Assert.assertEquals(2, sparkMetricsReportCnt.get());
+        System.clearProperty("kylin.server.query-metrics-enabled");
+    }
+
+    public void generateSparkMetrics(String queryId) {
+        Integer id = Integer.valueOf(queryId);
+        long executionStartTime = 0;
+        long jobStartTime= 0;
+        long submitTime = 0;
+
+        if (id == 1) {
+            executionStartTime = 1610609727972L;
+            jobStartTime = 1610609772880L;
+            submitTime = 1610610480942L;
+        } else if (id == 2) {
+            executionStartTime = 1610609876545L;
+            jobStartTime = 1610609901699L;
+            submitTime = 16106105016542L;
+        }
+        QuerySparkMetrics.getInstance().onJobStart(queryId, "test-sparder_" + id, id,
+                executionStartTime, id, jobStartTime);
+        QuerySparkMetrics.getInstance().onSparkStageStart(queryId, id, id, "stageType_" + id,
+                submitTime);
+    }
+
+    public void updateSparkMetrics(String queryId) {
+        Integer id = Integer.valueOf(queryId);
+        long jobEndTime = 0;
+        long executionEndTime = 0;
+        boolean jobIsSuccess = true;
+
+        QuerySparkMetrics.SparkStageMetrics queryStageMetrics = new QuerySparkMetrics.SparkStageMetrics();
+        if (id == 1) {
+            jobEndTime = 1610610734401L;
+            executionEndTime = 1610612655793L;
+            jobIsSuccess = true;
+            queryStageMetrics.setMetrics(10000, 10, 10, 100, 10, 1, 10, 1000, 1000, 100);
+        } else if (id == 2) {
+            jobEndTime = 1610610750397L;
+            executionEndTime = 1610612685275L;
+            jobIsSuccess = false;
+            queryStageMetrics.setMetrics(20000, 20, 20, 200, 20, 2, 20, 2000, 2000, 200);
+        }
+        QuerySparkMetrics.getInstance().updateSparkStageMetrics(queryId, id, id, true,
+                queryStageMetrics);
+        QuerySparkMetrics.getInstance().updateSparkJobMetrics(queryId, id, jobEndTime, jobIsSuccess);
+        QuerySparkMetrics.getInstance().updateExecutionMetrics(queryId, executionEndTime);
+    }
+
+    public static void verifyQuerySparkMetrics(String queryId,
+                                               QuerySparkMetrics.QueryExecutionMetrics queryExecutionMetrics) {
+        sparkMetricsReportCnt.getAndIncrement();
+        Assert.assertTrue(StringUtils.isNotBlank(queryId));
+        Assert.assertTrue(queryExecutionMetrics != null);
+        // verify
+        int id = Integer.valueOf(queryId);
+        Assert.assertTrue(queryExecutionMetrics.getSparkJobMetricsMap().get(id) != null);
+        Assert.assertTrue(queryExecutionMetrics.getSparkJobMetricsMap().get(id) != null);
+        Assert.assertEquals(queryExecutionMetrics.getSparderName(), "test-sparder_" + id);
+        Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id).getJobId(), id);
+
+        if (id == 1) {
+            //SqlResponse metrics
+            Assert.assertEquals(queryExecutionMetrics.getUser(), "ADMIN");
+            Assert.assertEquals(queryExecutionMetrics.getProject(), "DEFAULT");
+            Assert.assertEquals(queryExecutionMetrics.getQueryType(), "CACHE");
+            Assert.assertEquals(queryExecutionMetrics.getRealization(), "test_cube");
+            Assert.assertEquals(queryExecutionMetrics.getRealizationTypes(), "4");
+            Assert.assertEquals(queryExecutionMetrics.getCuboidIds(), "12345");
+            Assert.assertEquals(queryExecutionMetrics.getTotalScanCount(), 100);
+            Assert.assertEquals(queryExecutionMetrics.getResultCount(), 2);
+            Assert.assertEquals(queryExecutionMetrics.getException(), "NULL");
+
+            Assert.assertEquals(queryExecutionMetrics.getSparderName(), "test-sparder_" + id);
+            Assert.assertEquals(queryExecutionMetrics.getException(), "NULL");
+
+            //SparkExecution metrics
+            Assert.assertEquals(queryExecutionMetrics.getStartTime(), 1610609727972L);
+            Assert.assertEquals(queryExecutionMetrics.getEndTime(), 1610612655793L);
+            Assert.assertTrue(queryExecutionMetrics.getSparkJobMetricsMap().get(id).isSuccess());
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id).getStartTime(), 1610609772880L);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id).getEndTime(), 1610610734401L);
+
+            //SparkStage metrics
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().size(), 1);
+            Assert.assertTrue(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id) != null);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getStageType(), "stageType_" + id);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getStageId(), id);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getResultSize(), 10000);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getExecutorDeserializeCpuTime(), 10);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getExecutorDeserializeTime(), 10);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getExecutorRunTime(), 100);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getExecutorCpuTime(), 10);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getJvmGCTime(), 1);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getResultSerializationTime(), 10);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getMemoryBytesSpilled(), 1000);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getDiskBytesSpilled(), 1000);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getPeakExecutionMemory(), 100);
+        } else if (id == 2) {
+            //SqlResponse metrics
+            Assert.assertEquals(queryExecutionMetrics.getUser(), "ADMIN");
+            Assert.assertEquals(queryExecutionMetrics.getProject(), "DEFAULT");
+            Assert.assertEquals(queryExecutionMetrics.getQueryType(), "PARQUET");
+            Assert.assertEquals(queryExecutionMetrics.getRealization(), "test_cube");
+            Assert.assertEquals(queryExecutionMetrics.getRealizationTypes(), null);
+            Assert.assertEquals(queryExecutionMetrics.getCuboidIds(), null);
+            Assert.assertEquals(queryExecutionMetrics.getTotalScanCount(), 0);
+            Assert.assertEquals(queryExecutionMetrics.getResultCount(), 0);
+            Assert.assertEquals(queryExecutionMetrics.getException(), "NULL");
+
+            //SparkExecution metrics
+            Assert.assertEquals(queryExecutionMetrics.getStartTime(), 1610609876545L);
+            Assert.assertEquals(queryExecutionMetrics.getEndTime(), 1610612685275L);
+            Assert.assertFalse(queryExecutionMetrics.getSparkJobMetricsMap().get(id).isSuccess());
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id).getStartTime(), 1610609901699L);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id).getEndTime(), 1610610750397L);
+
+            //SparkStage metrics
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().size(), 1);
+            Assert.assertTrue(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id) != null);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getStageId(), id);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getResultSize(), 20000);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getExecutorDeserializeCpuTime(), 20);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getExecutorDeserializeTime(), 20);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getExecutorRunTime(), 200);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getExecutorCpuTime(), 20);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getJvmGCTime(), 2);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getResultSerializationTime(), 20);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getMemoryBytesSpilled(), 2000);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getDiskBytesSpilled(), 2000);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getPeakExecutionMemory(), 200);
+        }
+    }
+
+    private static class QuerySparkMetricsTestRemovalListener implements RemovalListener<String,
+            QuerySparkMetrics.QueryExecutionMetrics> {
+        @Override
+        public void onRemoval(RemovalNotification<String, QuerySparkMetrics.QueryExecutionMetrics> notification) {
+            try {
+                verifyQuerySparkMetrics(notification.getKey(), notification.getValue());
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
 }
diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
index 2d82e02..f3e7168 100644
--- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
@@ -467,7 +467,7 @@ public class CubeDescCreator {
         desc.setDimensions(dimensionDescList);
         desc.setMeasures(measureDescList);
         desc.setRowkey(rowKeyDesc);
-        //desc.setHbaseMapping(hBaseMapping);
+        desc.setHbaseMapping(hBaseMapping);
         desc.setNotifyList(Lists.<String> newArrayList());
         desc.setStatusNeedNotify(Lists.newArrayList(JobStatusEnum.ERROR.toString()));
         desc.setAutoMergeTimeRanges(new long[] { 86400000L, 604800000L, 2419200000L });
diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
index 73e493e..af40392 100644
--- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
@@ -130,7 +130,7 @@ public class HiveTableCreator {
         columns.add(new Pair<>(QuerySparkExecutionEnum.SPARDER_NAME.toString(), HiveTypeEnum.HSTRING.toString()));
         columns.add(new Pair<>(QuerySparkExecutionEnum.PROJECT.toString(), HiveTypeEnum.HSTRING.toString()));
         columns.add(new Pair<>(QuerySparkExecutionEnum.REALIZATION.toString(), HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QuerySparkExecutionEnum.REALIZATION_TYPE.toString(), HiveTypeEnum.HINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.REALIZATION_TYPE.toString(), HiveTypeEnum.HSTRING.toString()));
         columns.add(new Pair<>(QuerySparkExecutionEnum.CUBOID_IDS.toString(), HiveTypeEnum.HSTRING.toString()));
 
         columns.add(new Pair<>(QuerySparkExecutionEnum.TYPE.toString(), HiveTypeEnum.HSTRING.toString()));


[kylin] 07/07: Fix test case and fix system-cube.sh script

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit dc5f2951f1c0791d62afcd0c741e2e08d8cc958c
Author: Zhichao Zhang <44...@qq.com>
AuthorDate: Fri Jan 15 00:33:22 2021 +0800

    Fix test case and fix system-cube.sh script
    
    Refine system cube desc to reduce cuboid number
    
    Add queryExecutionMetricsMap shutdown
    
    Change Decimal datatype to Double datatype
---
 .github/workflows/maven.yml                        |   2 +-
 build/bin/system-cube.sh                           |  20 +-
 .../apache/kylin/metrics/QuerySparkMetrics.java    |   6 +
 .../kylin/metrics/lib/impl/MetricsSystem.java      |   2 +
 .../kylin/rest/controller/CubeController.java      |   9 +-
 .../kylin/rest/response/SQLResponseTest.java       |   9 +-
 .../kylin/rest/metrics/QueryMetricsTest.java       |  23 ---
 .../tool/metrics/systemcube/CubeDescCreator.java   | 206 +++++++++++++++------
 webapp/app/js/services/cubes.js                    |   4 +-
 9 files changed, 180 insertions(+), 101 deletions(-)

diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml
index 77ff0a9..3bec681 100644
--- a/.github/workflows/maven.yml
+++ b/.github/workflows/maven.yml
@@ -38,4 +38,4 @@ jobs:
       with:
         java-version: 1.8
     - name: Build with Maven
-      run: mvn -B package --file pom.xml
+      run: mvn clean -Dpre-commit apache-rat:check test -Dlicense.skip=false
diff --git a/build/bin/system-cube.sh b/build/bin/system-cube.sh
index ca35970..3ca9b40 100644
--- a/build/bin/system-cube.sh
+++ b/build/bin/system-cube.sh
@@ -38,9 +38,9 @@ OUTPUT_FORDER=$KYLIN_HOME/system_cube
 KYLIN_ENV=`grep "^kylin.env=" $KYLIN_HOME/conf/kylin.properties | cut -d "=" -f 2`
 KYLIN_ENV=${KYLIN_ENV:-"QA"}
 
-SC_NAME_1="KYLIN_HIVE_METRICS_QUERY_${KYLIN_ENV}"
-SC_NAME_2="KYLIN_HIVE_METRICS_QUERY_CUBE_${KYLIN_ENV}"
-SC_NAME_3="KYLIN_HIVE_METRICS_QUERY_RPC_${KYLIN_ENV}"
+SC_NAME_1="KYLIN_HIVE_METRICS_QUERY_EXECUTION_${KYLIN_ENV}"
+SC_NAME_2="KYLIN_HIVE_METRICS_QUERY_SPARK_JOB_${KYLIN_ENV}"
+SC_NAME_3="KYLIN_HIVE_METRICS_QUERY_SPARK_STAGE_${KYLIN_ENV}"
 SC_NAME_4="KYLIN_HIVE_METRICS_JOB_${KYLIN_ENV}"
 SC_NAME_5="KYLIN_HIVE_METRICS_JOB_EXCEPTION_${KYLIN_ENV}"
 
@@ -73,7 +73,6 @@ then
 
 	cat <<-EOF > ${SINK_TOOLS_FILE}
 	[
-	  [
     {
        "sink": "hive",
        "storage_type": 4,
@@ -81,7 +80,6 @@ then
          "kylin.cube.max-building-segments": "1"
        }
     }
-    ]
 	]
 	EOF
   $KYLIN_HOME/bin/kylin.sh org.apache.kylin.tool.metrics.systemcube.SCCreator \
@@ -91,10 +89,12 @@ then
   hive_client_mode=`bash ${KYLIN_HOME}/bin/get-properties.sh kylin.source.hive.client`
 
   # Get Database
-  system_database=`bash ${KYLIN_HOME}/bin/get-properties.sh kylin.source.hive.database-for-flat-table | tr [a-z] [A-Z]`
+  system_database_tmp=`bash ${KYLIN_HOME}/bin/get-properties.sh kylin.metrics.prefix`
+  system_database=${system_database_tmp:-"KYLIN"}
+  system_database=`echo ${system_database} | tr [a-z] [A-Z]`
 
   # 'create database' failed will not exit when donot have permission to create database;
-  sed -i -e 's/CREATE DATABASE /#CREATE DATABASE /g' ${OUTPUT_FORDER}/create_hive_tables_for_system_cubes.sql
+  sed -i -e 's/CREATE DATABASE /-- CREATE DATABASE /g' ${OUTPUT_FORDER}/create_hive_tables_for_system_cubes.sql
 
   if [ "${hive_client_mode}" == "beeline" ]
   then
@@ -104,15 +104,15 @@ then
       hive2_url=`expr match "${beeline_params}" '.*\(hive2:.*:[0-9]\{4,6\}\/\)'`
       if [ -z ${hive2_url} ]; then
           hive2_url=`expr match "${beeline_params}" '.*\(hive2:.*:[0-9]\{4,6\}\)'`
-          beeline_params=${beeline_params/${hive2_url}/${hive2_url}/${sample_database}}
+          beeline_params=${beeline_params/${hive2_url}/${hive2_url}/${system_database}}
       else
-          beeline_params=${beeline_params/${hive2_url}/${hive2_url}${sample_database}}
+          beeline_params=${beeline_params/${hive2_url}/${hive2_url}${system_database}}
       fi
 
       beeline ${beeline_params} -f ${OUTPUT_FORDER}/create_hive_tables_for_system_cubes.sql  || { exit 1; }
   else
       hive -e "CREATE DATABASE IF NOT EXISTS "$system_database
-      hive --database $sample_database -f ${OUTPUT_FORDER}/create_hive_tables_for_system_cubes.sql  || { exit 1; }
+      hive --database $system_database -f ${OUTPUT_FORDER}/create_hive_tables_for_system_cubes.sql  || { exit 1; }
   fi
 
   $KYLIN_HOME/bin/metastore.sh restore ${OUTPUT_FORDER}
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java b/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java
index a0efe64..0a62533 100644
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java
@@ -24,6 +24,7 @@ import org.apache.kylin.metrics.lib.impl.TimedRecordEvent;
 import org.apache.kylin.metrics.property.QuerySparkExecutionEnum;
 import org.apache.kylin.metrics.property.QuerySparkJobEnum;
 import org.apache.kylin.metrics.property.QuerySparkStageEnum;
+import org.apache.kylin.shaded.com.google.common.annotations.VisibleForTesting;
 import org.apache.kylin.shaded.com.google.common.cache.Cache;
 import org.apache.kylin.shaded.com.google.common.cache.CacheBuilder;
 import org.apache.kylin.shaded.com.google.common.cache.RemovalListener;
@@ -89,7 +90,12 @@ public class QuerySparkMetrics {
                 KylinConfig.getInstanceFromEnv().getKylinMetricsCacheExpireSeconds(), TimeUnit.SECONDS);
     }
 
+    private void shutdown() {
+        queryExecutionMetricsMap.invalidateAll();
+    }
+
     // only for test case
+    @VisibleForTesting
     public static void init(RemovalListener removalListener) {
         instance = new QuerySparkMetrics(removalListener);
     }
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java
index 1e46bce..4051419 100644
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java
@@ -26,6 +26,7 @@ import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metrics.QuerySparkMetrics;
 import org.apache.kylin.metrics.lib.ActiveReservoir;
 import org.apache.kylin.metrics.lib.ActiveReservoirRecordFilter;
 import org.slf4j.Logger;
@@ -55,6 +56,7 @@ public class MetricsSystem extends MetricRegistry {
     }
 
     public void shutdown() throws IOException {
+        QuerySparkMetrics.getInstance().getQueryExecutionMetricsMap().invalidateAll();
         for (ActiveReservoir entry : activeReservoirs.values()) {
             entry.close();
         }
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 059915c..60bbf6c 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -895,12 +895,13 @@ public class CubeController extends BasicController {
 
         Map<Long, Long> hitFrequencyMap = null;
         Map<Long, Long> queryMatchMap = null;
-        try {
+        // currently not support to collect these metrics
+        /*try {
             hitFrequencyMap = getTargetCuboidHitFrequency(cubeName);
             queryMatchMap = cubeService.getCuboidQueryMatchCount(cubeName);
         } catch (Exception e) {
             logger.warn("Fail to query on system cube due to " + e);
-        }
+        }*/
 
         Set<Long> currentCuboidSet = cube.getCuboidScheduler().getAllCuboidIds();
         return cubeService.getCuboidTreeResponse(cuboidScheduler, cuboidStatsMap, hitFrequencyMap, queryMatchMap,
@@ -912,7 +913,9 @@ public class CubeController extends BasicController {
     public CuboidTreeResponse getRecommendCuboids(@PathVariable String cubeName) throws IOException {
         checkCubeExists(cubeName);
         CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
-        Map<Long, Long> recommendCuboidStatsMap = getRecommendCuboidList(cube);
+        // currently not support to collect these metrics
+        // Map<Long, Long> recommendCuboidStatsMap = getRecommendCuboidList(cube);
+        Map<Long, Long> recommendCuboidStatsMap = null;
         if (recommendCuboidStatsMap == null || recommendCuboidStatsMap.isEmpty()) {
             return new CuboidTreeResponse();
         }
diff --git a/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java b/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
index f1c704e..370b1ea 100644
--- a/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
+++ b/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
@@ -32,10 +32,11 @@ public class SQLResponseTest {
 
     @Test
     public void testInterfaceConsistency() throws IOException {
-        String[] attrArray = new String[] { "columnMetas", "results", "cube", "cuboidIds", "affectedRowCount", "isException",
-                "exceptionMessage", "duration", "partial", "totalScanCount", "hitExceptionCache", "storageCacheUsed",
-                "sparkPool", "pushDown", "traceUrl", "totalScanBytes", "totalScanFiles",
-                "metadataTime", "totalSparkScanTime" };
+        String[] attrArray = new String[] { "columnMetas", "results", "cube", "cuboidIds",
+                "realizationTypes", "affectedRowCount", "isException",
+                "exceptionMessage", "duration", "partial", "totalScanCount", "hitExceptionCache",
+                "storageCacheUsed", "sparkPool", "pushDown", "traceUrl", "totalScanBytes",
+                "totalScanFiles", "metadataTime", "totalSparkScanTime" };
 
         SQLResponse sqlResponse = new SQLResponse(null, null, "learn_cube", 100, false, null, false, false);
         String jsonStr = JsonUtil.writeValueAsString(sqlResponse);
diff --git a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
index 3ad7eea..8c69198 100644
--- a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
@@ -202,25 +202,6 @@ public class QueryMetricsTest extends ServiceTestBase {
         updateSparkMetrics(queryId1);
 
         Assert.assertTrue(QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryId1) != null);
-        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QueryCount"));
-        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QuerySuccessCount"));
-        Assert.assertEquals(0L, mBeanServer.getAttribute(objectName, "QueryFailCount"));
-        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "CacheHitCount"));
-
-        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "ScanRowCountNumOps"));
-        Assert.assertEquals(100.0, mBeanServer.getAttribute(objectName, "ScanRowCountAvgTime"));
-        Assert.assertEquals(100.0, mBeanServer.getAttribute(objectName, "ScanRowCountMaxTime"));
-        Assert.assertEquals(100.0, mBeanServer.getAttribute(objectName, "ScanRowCountMinTime"));
-
-        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "ResultRowCountNumOps"));
-        Assert.assertEquals(2.0, mBeanServer.getAttribute(objectName, "ResultRowCountMaxTime"));
-        Assert.assertEquals(2.0, mBeanServer.getAttribute(objectName, "ResultRowCountAvgTime"));
-        Assert.assertEquals(2.0, mBeanServer.getAttribute(objectName, "ResultRowCountMinTime"));
-
-        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QueryLatencyNumOps"));
-        Assert.assertEquals(10.0, mBeanServer.getAttribute(objectName, "QueryLatencyMaxTime"));
-        Assert.assertEquals(10.0, mBeanServer.getAttribute(objectName, "QueryLatencyAvgTime"));
-        Assert.assertEquals(10.0, mBeanServer.getAttribute(objectName, "QueryLatencyMinTime"));
 
         String queryId2 = "2";
         generateSparkMetrics(queryId2);
@@ -241,10 +222,6 @@ public class QueryMetricsTest extends ServiceTestBase {
 
         Thread.sleep(5000);
 
-        Assert.assertEquals(2L, mBeanServer.getAttribute(objectName, "QueryCount"));
-        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QuerySuccessCount"));
-        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QueryFailCount"));
-
         Assert.assertTrue(QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryId2) == null);
         Assert.assertEquals(2, sparkMetricsReportCnt.get());
         System.clearProperty("kylin.server.query-metrics-enabled");
diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
index f3e7168..66f1877 100644
--- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
@@ -118,26 +118,44 @@ public class CubeDescCreator {
         rowKeyDesc.setRowkeyColumns(rowKeyColDescs);
 
         //Set for aggregation group
-        String[][] hierarchy_dims = new String[4][];
-        hierarchy_dims[0] = getTimeHierarchy();
-        hierarchy_dims[1] = new String[3];
-        hierarchy_dims[1][0] = QuerySparkExecutionEnum.REALIZATION_TYPE.toString();
-        hierarchy_dims[1][1] = QuerySparkExecutionEnum.REALIZATION.toString();
-        hierarchy_dims[1][2] = QuerySparkExecutionEnum.CUBOID_IDS.toString();
-        hierarchy_dims[2] = new String[2];
-        hierarchy_dims[2][0] = QuerySparkExecutionEnum.START_TIME.toString();
-        hierarchy_dims[2][1] = QuerySparkExecutionEnum.END_TIME.toString();
-        hierarchy_dims[3] = new String[2];
-        hierarchy_dims[3][0] = QuerySparkExecutionEnum.SPARDER_NAME.toString();
-        hierarchy_dims[3][1] = RecordEvent.RecordReserveKeyEnum.HOST.toString();
-        for (int i = 0; i < hierarchy_dims.length; i++) {
-            hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]);
+        String[][] hierarchyDims = new String[1][];
+        hierarchyDims[0] = getTimeHierarchy();
+        for (int i = 0; i < hierarchyDims.length; i++) {
+            hierarchyDims[i] = refineColumnWithTable(tableName, hierarchyDims[i]);
         }
 
+        String[] mandatoryDims = new String[] {refineColumnWithTable(tableName,
+                QuerySparkExecutionEnum.PROJECT.toString())};
+
+        String[][] jointDims = new String[5][];
+        jointDims[0] = new String[]{
+                refineColumnWithTable(tableName, TimePropertyEnum.TIME_HOUR.toString()),
+                refineColumnWithTable(tableName, TimePropertyEnum.TIME_MINUTE.toString())
+        };
+        jointDims[1] = new String[]{
+                refineColumnWithTable(tableName, QuerySparkExecutionEnum.START_TIME.toString()),
+                refineColumnWithTable(tableName, QuerySparkExecutionEnum.END_TIME.toString())
+        };
+        jointDims[2] = new String[]{
+                refineColumnWithTable(tableName, QuerySparkExecutionEnum.REALIZATION_TYPE.toString()),
+                refineColumnWithTable(tableName, QuerySparkExecutionEnum.REALIZATION.toString()),
+                refineColumnWithTable(tableName, QuerySparkExecutionEnum.CUBOID_IDS.toString())
+        };
+        jointDims[3] = new String[]{
+                refineColumnWithTable(tableName, QuerySparkExecutionEnum.SPARDER_NAME.toString()),
+                refineColumnWithTable(tableName, RecordEvent.RecordReserveKeyEnum.HOST.toString()),
+                refineColumnWithTable(tableName, QuerySparkExecutionEnum.EXCEPTION.toString()),
+                refineColumnWithTable(tableName, QuerySparkExecutionEnum.TYPE.toString())
+        };
+        jointDims[4] = new String[]{
+                refineColumnWithTable(tableName, QuerySparkExecutionEnum.USER.toString()),
+                refineColumnWithTable(tableName, QuerySparkExecutionEnum.QUERY_ID.toString())
+        };
+
         SelectRule selectRule = new SelectRule();
-        selectRule.mandatoryDims = new String[0];
-        selectRule.hierarchyDims = hierarchy_dims;
-        selectRule.jointDims = new String[0][0];
+        selectRule.mandatoryDims = mandatoryDims;
+        selectRule.hierarchyDims = hierarchyDims;
+        selectRule.jointDims = jointDims;
 
         AggregationGroup aggGroup = new AggregationGroup();
         aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions));
@@ -159,7 +177,6 @@ public class CubeDescCreator {
         dimensions.remove(TimePropertyEnum.DAY_TIME.toString());
         dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString());
         dimensions.remove(RecordEvent.RecordReserveKeyEnum.HOST.toString());
-        dimensions.remove(QuerySparkJobEnum.PROJECT.toString());
 
         List<DimensionDesc> dimensionDescList = Lists.newArrayListWithExpectedSize(dimensions.size());
         for (String dimensionName : dimensions) {
@@ -194,27 +211,44 @@ public class CubeDescCreator {
         idx++;
         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkJobEnum.END_TIME.toString(), idx + 1);
         idx++;
+        rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkJobEnum.PROJECT.toString(), idx + 1);
+        idx++;
         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QuerySparkJobEnum.IF_SUCCESS.toString(), idx + 1);
         idx++;
 
         RowKeyDesc rowKeyDesc = new RowKeyDesc();
         rowKeyDesc.setRowkeyColumns(rowKeyColDescs);
 
-        String[][] hierarchy_dims = new String[2][];
-        hierarchy_dims[0] = getTimeHierarchy();
-        hierarchy_dims[1] = new String[3];
-        hierarchy_dims[1][0] = QuerySparkJobEnum.QUERY_ID.toString();
-        hierarchy_dims[1][1] = QuerySparkJobEnum.EXECUTION_ID.toString();
-        hierarchy_dims[1][2] = QuerySparkJobEnum.JOB_ID.toString();
-
-        for (int i = 0; i < hierarchy_dims.length; i++) {
-            hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]);
+        //Set for aggregation group
+        String[][] hierarchyDims = new String[1][];
+        hierarchyDims[0] = getTimeHierarchy();
+        for (int i = 0; i < hierarchyDims.length; i++) {
+            hierarchyDims[i] = refineColumnWithTable(tableName, hierarchyDims[i]);
         }
 
+        String[] mandatoryDims = new String[] {refineColumnWithTable(tableName,
+                QuerySparkJobEnum.PROJECT.toString())};
+
+        String[][] jointDims = new String[3][];
+        jointDims[0] = new String[]{
+                refineColumnWithTable(tableName, TimePropertyEnum.TIME_HOUR.toString()),
+                refineColumnWithTable(tableName, TimePropertyEnum.TIME_MINUTE.toString())
+        };
+        jointDims[1] = new String[]{
+                refineColumnWithTable(tableName, QuerySparkJobEnum.START_TIME.toString()),
+                refineColumnWithTable(tableName, QuerySparkJobEnum.END_TIME.toString()),
+                refineColumnWithTable(tableName, QuerySparkJobEnum.IF_SUCCESS.toString())
+        };
+        jointDims[2] = new String[]{
+                refineColumnWithTable(tableName, QuerySparkJobEnum.QUERY_ID.toString()),
+                refineColumnWithTable(tableName, QuerySparkJobEnum.EXECUTION_ID.toString()),
+                refineColumnWithTable(tableName, QuerySparkJobEnum.JOB_ID.toString())
+        };
+
         SelectRule selectRule = new SelectRule();
-        selectRule.mandatoryDims = new String[0];
-        selectRule.hierarchyDims = hierarchy_dims;
-        selectRule.jointDims = new String[0][0];
+        selectRule.mandatoryDims = mandatoryDims;
+        selectRule.hierarchyDims = hierarchyDims;
+        selectRule.jointDims = jointDims;
 
         AggregationGroup aggGroup = new AggregationGroup();
         aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions));
@@ -284,21 +318,39 @@ public class CubeDescCreator {
         rowKeyDesc.setRowkeyColumns(rowKeyColDescs);
 
         //Set for aggregation group
-        String[][] hierarchy_dims = new String[2][];
-        hierarchy_dims[0] = getTimeHierarchy();
-        hierarchy_dims[1] = new String[4];
-        hierarchy_dims[1][0] = QuerySparkStageEnum.QUERY_ID.toString();
-        hierarchy_dims[1][1] = QuerySparkStageEnum.EXECUTION_ID.toString();
-        hierarchy_dims[1][2] = QuerySparkStageEnum.JOB_ID.toString();
-        hierarchy_dims[1][3] = QuerySparkStageEnum.STAGE_ID.toString();
-        for (int i = 0; i < hierarchy_dims.length; i++) {
-            hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]);
+        String[][] hierarchyDims = new String[1][];
+        hierarchyDims[0] = getTimeHierarchy();
+        for (int i = 0; i < hierarchyDims.length; i++) {
+            hierarchyDims[i] = refineColumnWithTable(tableName, hierarchyDims[i]);
         }
 
+        String[] mandatoryDims = new String[] {refineColumnWithTable(tableName,
+                QuerySparkStageEnum.PROJECT.toString())};
+
+        String[][] jointDims = new String[4][];
+        jointDims[0] = new String[]{
+                refineColumnWithTable(tableName, TimePropertyEnum.TIME_HOUR.toString()),
+                refineColumnWithTable(tableName, TimePropertyEnum.TIME_MINUTE.toString())
+        };
+        jointDims[1] = new String[]{
+                refineColumnWithTable(tableName, QuerySparkStageEnum.REALIZATION.toString()),
+                refineColumnWithTable(tableName, QuerySparkStageEnum.CUBOID_ID.toString())
+        };
+        jointDims[2] = new String[]{
+                refineColumnWithTable(tableName, RecordEvent.RecordReserveKeyEnum.HOST.toString()),
+                refineColumnWithTable(tableName, QuerySparkStageEnum.IF_SUCCESS.toString())
+        };
+        jointDims[3] = new String[]{
+                refineColumnWithTable(tableName, QuerySparkStageEnum.QUERY_ID.toString()),
+                refineColumnWithTable(tableName, QuerySparkStageEnum.EXECUTION_ID.toString()),
+                refineColumnWithTable(tableName, QuerySparkStageEnum.JOB_ID.toString()),
+                refineColumnWithTable(tableName, QuerySparkStageEnum.STAGE_ID.toString())
+        };
+
         SelectRule selectRule = new SelectRule();
-        selectRule.mandatoryDims = new String[0];
-        selectRule.hierarchyDims = hierarchy_dims;
-        selectRule.jointDims = new String[0][0];
+        selectRule.mandatoryDims = mandatoryDims;
+        selectRule.hierarchyDims = hierarchyDims;
+        selectRule.jointDims = jointDims;
 
         AggregationGroup aggGroup = new AggregationGroup();
         aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions));
@@ -369,16 +421,33 @@ public class CubeDescCreator {
         rowKeyDesc.setRowkeyColumns(rowKeyColDescs);
 
         //Set for aggregation group
-        String[][] hierarchy_dims = new String[1][];
-        hierarchy_dims[0] = getTimeHierarchy();
-        for (int i = 0; i < hierarchy_dims.length; i++) {
-            hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]);
+        String[][] hierarchyDims = new String[1][];
+        hierarchyDims[0] = getTimeHierarchy();
+        for (int i = 0; i < hierarchyDims.length; i++) {
+            hierarchyDims[i] = refineColumnWithTable(tableName, hierarchyDims[i]);
         }
 
+        String[] mandatoryDims = new String[] {refineColumnWithTable(tableName,
+                JobPropertyEnum.PROJECT.toString())};
+
+        String[][] jointDims = new String[3][];
+        jointDims[0] = new String[] {
+                refineColumnWithTable(tableName, JobPropertyEnum.CUBE.toString()),
+                refineColumnWithTable(tableName, JobPropertyEnum.ALGORITHM.toString())
+        };
+        jointDims[1] = new String[] {
+                refineColumnWithTable(tableName, TimePropertyEnum.TIME_HOUR.toString()),
+                refineColumnWithTable(tableName, TimePropertyEnum.TIME_MINUTE.toString())
+        };
+        jointDims[2] = new String[] {
+                refineColumnWithTable(tableName, JobPropertyEnum.USER.toString()),
+                refineColumnWithTable(tableName, JobPropertyEnum.TYPE.toString())
+        };
+
         SelectRule selectRule = new SelectRule();
-        selectRule.mandatoryDims = new String[0];
-        selectRule.hierarchyDims = hierarchy_dims;
-        selectRule.jointDims = new String[0][0];
+        selectRule.mandatoryDims = mandatoryDims;
+        selectRule.hierarchyDims = hierarchyDims;
+        selectRule.jointDims = jointDims;
 
         AggregationGroup aggGroup = new AggregationGroup();
         aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions));
@@ -433,16 +502,33 @@ public class CubeDescCreator {
         rowKeyDesc.setRowkeyColumns(rowKeyColDescs);
 
         //Set for aggregation group
-        String[][] hierarchy_dims = new String[1][];
-        hierarchy_dims[0] = getTimeHierarchy();
-        for (int i = 0; i < hierarchy_dims.length; i++) {
-            hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]);
+        String[][] hierarchyDims = new String[1][];
+        hierarchyDims[0] = getTimeHierarchy();
+        for (int i = 0; i < hierarchyDims.length; i++) {
+            hierarchyDims[i] = refineColumnWithTable(tableName, hierarchyDims[i]);
         }
 
+        String[] mandatoryDims = new String[] {refineColumnWithTable(tableName,
+                JobPropertyEnum.PROJECT.toString())};
+
+        String[][] jointDims = new String[3][];
+        jointDims[0] = new String[] {
+                refineColumnWithTable(tableName, JobPropertyEnum.CUBE.toString()),
+                refineColumnWithTable(tableName, JobPropertyEnum.ALGORITHM.toString())
+        };
+        jointDims[1] = new String[] {
+                refineColumnWithTable(tableName, TimePropertyEnum.TIME_HOUR.toString()),
+                refineColumnWithTable(tableName, TimePropertyEnum.TIME_MINUTE.toString())
+        };
+        jointDims[2] = new String[] {
+                refineColumnWithTable(tableName, JobPropertyEnum.USER.toString()),
+                refineColumnWithTable(tableName, JobPropertyEnum.TYPE.toString())
+        };
+
         SelectRule selectRule = new SelectRule();
-        selectRule.mandatoryDims = new String[0];
-        selectRule.hierarchyDims = hierarchy_dims;
-        selectRule.jointDims = new String[0][0];
+        selectRule.mandatoryDims = mandatoryDims;
+        selectRule.hierarchyDims = hierarchyDims;
+        selectRule.jointDims = jointDims;
 
         AggregationGroup aggGroup = new AggregationGroup();
         aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions));
@@ -525,6 +611,10 @@ public class CubeDescCreator {
         return result;
     }
 
+    public static String refineColumnWithTable(String tableName, String column) {
+        return tableName.substring(tableName.lastIndexOf(".") + 1) + "." + column;
+    }
+
     public static String[] refineColumnWithTable(String tableName, List<String> columns) {
         String[] dimensions = new String[columns.size()];
         for (int i = 0; i < dimensions.length; i++) {
@@ -599,9 +689,7 @@ public class CubeDescCreator {
         FunctionDesc function = new FunctionDesc();
         function.setExpression(FunctionDesc.FUNC_SUM);
         function.setParameter(parameterDesc);
-        function.setReturnType(dataType.equals(HiveTableCreator.HiveTypeEnum.HDOUBLE.toString())
-                ? HiveTableCreator.HiveTypeEnum.HDECIMAL.toString()
-                : dataType);
+        function.setReturnType(dataType);
 
         MeasureDesc result = new MeasureDesc();
         result.setName(column + "_SUM");
diff --git a/webapp/app/js/services/cubes.js b/webapp/app/js/services/cubes.js
index 53f490d..fea92ce 100644
--- a/webapp/app/js/services/cubes.js
+++ b/webapp/app/js/services/cubes.js
@@ -33,7 +33,9 @@ KylinApp.factory('CubeService', ['$resource', function ($resource, config) {
         });
       }
     };
-    iterator(data.root, data.root.row_count);
+    if (data.root) {
+      iterator(data.root, data.root.row_count);
+    }
     return cuboids;
   };
   return $resource(Config.service.url + 'cubes/:cubeId/:propName/:propValue/:action', {}, {


[kylin] 03/07: KYLIN-4496: Metric data missing

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 7431240a7a8d868cbceba25cfb5b0d66c428cb66
Author: tianhui5 <ti...@xiaomi.com>
AuthorDate: Thu May 14 18:34:49 2020 +0800

    KYLIN-4496: Metric data missing
    
    (cherry picked from commit 278aababa11fff222327ec6b6f73e659b147b8d7)
---
 .../main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java   | 1 +
 1 file changed, 1 insertion(+)

diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
index da267d2..a96b261 100644
--- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
@@ -114,6 +114,7 @@ public class HiveProducer {
 
     public void close() {
         tableFieldSchemaCache.cleanUp();
+        closeFout();
     }
 
     public void send(final Record record) throws Exception {


[kylin] 04/07: KYLIN-4573 Add option to indicate whether to close file for every append for Hive Producer

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 3400338d7870c843e5ab2490c6767aa37e8092e4
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Mon Jun 15 16:41:36 2020 +0800

    KYLIN-4573 Add option to indicate whether to close file for every append for Hive Producer
    
    (cherry picked from commit 616e06675278a6857f3cbb353a4f9c2243eeccc1)
---
 .../kylin/metrics/lib/impl/hive/HiveProducer.java  | 93 ++++++++++++++++++----
 server/src/main/resources/kylinMetrics.xml         |  1 +
 2 files changed, 77 insertions(+), 17 deletions(-)

diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
index a96b261..8bc7a43 100644
--- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
+++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -66,6 +67,15 @@ public class HiveProducer {
     private Path curPartitionContentPath;
     private int id = 0;
     private FSDataOutputStream fout;
+    /**
+     * Some cloud file system, like AWS S3, didn't support append action to exist file.
+     * When append is not supported, will produce new file in a call to write method.
+     */
+    private final boolean supportAppend;
+
+    private final boolean closeFileEveryAppend;
+
+    private final Map<String, String> kylinSpecifiedConfig = new HashMap<>();
 
     public HiveProducer(String metricType, Properties props) throws Exception {
         this(metricType, props, new HiveConf());
@@ -75,7 +85,13 @@ public class HiveProducer {
         this.metricType = metricType;
         hiveConf = hiveConfig;
         for (Map.Entry<Object, Object> e : props.entrySet()) {
-            hiveConf.set(e.getKey().toString(), e.getValue().toString());
+            String key = e.getKey().toString();
+            String value = e.getValue().toString();
+            if (key.startsWith("kylin.")) {
+                kylinSpecifiedConfig.put(key, value);
+            } else {
+                hiveConf.set(key, value);
+            }
         }
 
         fs = FileSystem.get(hiveConf);
@@ -96,6 +112,7 @@ public class HiveProducer {
                         IMetaStoreClient metaStoreClient = HiveMetaStoreClientFactory.getHiveMetaStoreClient(hiveConf);
                         String tableLocation = metaStoreClient.getTable(tableName.getFirst(), tableName.getSecond())
                                 .getSd().getLocation();
+                        logger.debug("Find table location for {} at {}", tableName.getSecond(), tableLocation);
                         List<FieldSchema> fields = metaStoreClient.getFields(tableName.getFirst(),
                                 tableName.getSecond());
                         metaStoreClient.close();
@@ -110,6 +127,12 @@ public class HiveProducer {
             hostName = "UNKNOWN";
         }
         contentFilePrefix = hostName + "-" + System.currentTimeMillis() + "-part-";
+        String fsUri = fs.getUri().toString();
+        supportAppend = fsUri.startsWith("hdfs") ; // Only HDFS is appendable
+        logger.info("For {}, supportAppend was set to {}", fsUri, supportAppend);
+
+        closeFileEveryAppend = !supportAppend
+                || Boolean.parseBoolean(kylinSpecifiedConfig.get("kylin.hive.producer.close-file-every-append"));
     }
 
     public void close() {
@@ -127,7 +150,7 @@ public class HiveProducer {
         for (Record record : recordList) {
             HiveProducerRecord hiveRecord = convertTo(record);
             if (recordMap.get(hiveRecord.key()) == null) {
-                recordMap.put(hiveRecord.key(), Lists.<HiveProducerRecord> newLinkedList());
+                recordMap.put(hiveRecord.key(), Lists.<HiveProducerRecord>newLinkedList());
             }
             recordMap.get(hiveRecord.key()).add(hiveRecord);
         }
@@ -175,17 +198,31 @@ public class HiveProducer {
             }
             hql.append(")");
             logger.debug("create partition by {}.", hql);
-            Driver driver = new Driver(hiveConf);
-            CliSessionState session = new CliSessionState(hiveConf);
-            SessionState.start(session);
-            CommandProcessorResponse res = driver.run(hql.toString());
-            if (res.getResponseCode() != 0) {
-                logger.warn("Fail to add partition. HQL: {}; Cause by: {}",
-                        hql.toString(),
-                        res.toString());
+            Driver driver = null;
+            CliSessionState session = null;
+            try {
+                driver = new Driver(hiveConf);
+                session = new CliSessionState(hiveConf);
+                SessionState.start(session);
+                CommandProcessorResponse res = driver.run(hql.toString());
+                if (res.getResponseCode() != 0) {
+                    logger.warn("Fail to add partition. HQL: {}; Cause by: {}",
+                            hql.toString(),
+                            res.toString());
+                }
+                session.close();
+                driver.close();
+            } catch (Exception ex) {
+                // Do not let hive exception stop HiveProducer from writing file, so catch and report it here
+                logger.error("create partition failed, please create it manually : " + hql, ex);
+            } finally {
+                if (session != null) {
+                    session.close();
+                }
+                if (driver != null) {
+                    driver.close();
+                }
             }
-            session.close();
-            driver.close();
         }
 
         // Step 3: create path for new partition if it is the first time write metrics message or new partition should be used
@@ -195,7 +232,21 @@ public class HiveProducer {
                 closeFout();
             }
 
-            Path partitionContentPath = new Path(partitionPath, contentFilePrefix + String.format(Locale.ROOT, "%04d", id));
+            Path partitionContentPath = new Path(partitionPath, contentFilePrefix + String.format(Locale.ROOT, "%05d", id));
+
+            // Do not overwrite exist files when supportAppend was set to false
+            int nCheck = 0;
+            while (!supportAppend && fs.exists(partitionContentPath)) {
+                id++;
+                nCheck++;
+                partitionContentPath = new Path(partitionPath, contentFilePrefix + String.format(Locale.ROOT, "%05d", id));
+                logger.debug("{} exists, skip it.", partitionContentPath);
+                if (nCheck > 100000) {
+                    logger.warn("Exceed max check times.");
+                    break;
+                }
+            }
+
             logger.info("Try to use new partition content path: {} for metric: {}", partitionContentPath, metricType);
             if (!fs.exists(partitionContentPath)) {
                 int nRetry = 0;
@@ -210,30 +261,38 @@ public class HiveProducer {
                             "Fail to create HDFS file: " + partitionContentPath + " after " + nRetry + " retries");
                 }
             }
-            fout = fs.append(partitionContentPath);
+            if (supportAppend) {
+                fout = fs.append(partitionContentPath);
+            } else {
+                fout = fs.create(partitionContentPath);
+            }
             prePartitionPath = partitionPath.toString();
             curPartitionContentPath = partitionContentPath;
-            id = (id + 1) % 10;
+            id = (id + 1) % (supportAppend ? 10 : 100000);
         }
 
-        // Step 4: append record to HDFS without flush
+        // Step 4: append record to DFS
         try {
             int count = 0;
             for (HiveProducerRecord elem : recordItr) {
                 fout.writeBytes(elem.valueToString() + "\n");
                 count++;
             }
-            logger.info("Success to write {} metrics ({}) to file {}", count, metricType, curPartitionContentPath);
+            logger.debug("Success to write {} metrics ({}) to file {}", count, metricType, curPartitionContentPath);
         } catch (IOException e) {
             logger.error("Fails to write metrics(" + metricType + ") to file " + curPartitionContentPath.toString()
                     + " due to ", e);
             closeFout();
         }
+        if (closeFileEveryAppend) {
+            closeFout();
+        }
     }
 
     private void closeFout() {
         if (fout != null) {
             try {
+                logger.debug("Flush output stream {}.", curPartitionContentPath);
                 fout.close();
             } catch (Exception e) {
                 logger.error("Close the path: " + curPartitionContentPath + " failed", e);
diff --git a/server/src/main/resources/kylinMetrics.xml b/server/src/main/resources/kylinMetrics.xml
index 843fb91..85c879f 100644
--- a/server/src/main/resources/kylinMetrics.xml
+++ b/server/src/main/resources/kylinMetrics.xml
@@ -73,6 +73,7 @@
                                           value="org.apache.kylin.metrics.lib.impl.hive.HiveReservoirReporter"/>
                                 <property name="second">
                                     <props>
+                                        <prop key="kylin.hive.producer.close-file-every-append">true</prop>
                                     </props>
                                 </property>
                             </bean>


[kylin] 05/07: KYLIN-4653 Make the capacity for the LinkedBlockingQueue of BlockingReservoir configurable

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 6f48ce8322e6e46df107564c7788e25bcd3abde0
Author: chuxiao <ch...@didichuxing.com>
AuthorDate: Wed Jul 22 23:53:35 2020 +0800

    KYLIN-4653 Make the capacity for the LinkedBlockingQueue of BlockingReservoir configurable
    
    (cherry picked from commit 8e44f573be5905fe85f10241f0f7a7844e48f5f0)
---
 .../kylin/metrics/lib/impl/BlockingReservoir.java  | 27 +++++++++++++++++++---
 server/src/main/resources/kylinMetrics.xml         |  7 +++++-
 2 files changed, 30 insertions(+), 4 deletions(-)

diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
index 6158096..afa34a9 100644
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
@@ -22,6 +22,8 @@ import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.kylin.metrics.lib.ActiveReservoirListener;
 import org.apache.kylin.metrics.lib.Record;
 import org.slf4j.Logger;
@@ -58,11 +60,19 @@ public class BlockingReservoir extends AbstractActiveReservoir {
     }
 
     public BlockingReservoir(int minReportSize, int maxReportSize, int maxReportTime) {
+        this(minReportSize, maxReportSize, maxReportSize, MAX_QUEUE_SIZE);
+    }
+
+    public BlockingReservoir(int minReportSize, int maxReportSize, int maxReportTime, int maxQueueSize) {
+        Preconditions.checkArgument(minReportSize > 0, "minReportSize should be larger than 0");
+        Preconditions.checkArgument(maxReportSize >= minReportSize,
+                "maxReportSize should not be less than minBatchSize");
+        Preconditions.checkArgument(maxReportTime > 0, "maxReportTime should be larger than 0");
         this.minReportSize = minReportSize;
         this.maxReportSize = maxReportSize;
         this.maxReportTime = maxReportTime * 60 * 1000L;
 
-        this.recordsQueue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE);
+        this.recordsQueue = new LinkedBlockingQueue<>(maxQueueSize);
         this.listeners = Lists.newArrayList();
 
         this.records = Lists.newArrayListWithExpectedSize(this.maxReportSize);
@@ -95,9 +105,11 @@ public class BlockingReservoir extends AbstractActiveReservoir {
         if (ifAll) {
             records = Lists.newArrayList();
             recordsQueue.drainTo(records);
+            logger.info("Will report {} metrics records", records.size());
         } else {
             records.clear();
             recordsQueue.drainTo(records, maxReportSize);
+            logger.info("Will report {} metrics records, remaining {} records", records.size(), size());
         }
 
         boolean ifSucceed = true;
@@ -127,9 +139,18 @@ public class BlockingReservoir extends AbstractActiveReservoir {
         return true;
     }
 
-    @Override
-    public void start() {
+    @VisibleForTesting
+    void notifyUpdate() {
+        onRecordUpdate(false);
+    }
+
+    @VisibleForTesting
+    void setReady() {
         super.start();
+    }
+
+    public void start() {
+        setReady();
         scheduledReporter.start();
     }
 
diff --git a/server/src/main/resources/kylinMetrics.xml b/server/src/main/resources/kylinMetrics.xml
index 85c879f..a9d907a 100644
--- a/server/src/main/resources/kylinMetrics.xml
+++ b/server/src/main/resources/kylinMetrics.xml
@@ -34,10 +34,15 @@
             <value>500</value>
         </constructor-arg>
 
-        <!-- minReportTime, min duration(in minute) between two report action-->
+        <!-- maxReportTime, max duration(in minute) between two report action-->
         <constructor-arg index="2">
             <value>10</value>
         </constructor-arg>
+
+        <!-- maxQueueSize, max queue size of LinkedBlockingQueue-->
+        <constructor-arg index="3">
+            <value>50000</value>
+        </constructor-arg>
     </bean>
 
     <bean id="hiveSink" class="org.apache.kylin.metrics.lib.impl.hive.HiveSink"/>