You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/01/17 15:17:48 UTC

[kylin] 02/07: add test case

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit ad7b47245c959ea3e57efe1c8f5a737d0b1653ed
Author: Zhichao Zhang <44...@qq.com>
AuthorDate: Thu Jan 14 14:51:59 2021 +0800

    add test case
    
    Add metrics check
---
 build/bin/system-cube.sh                           |  20 +-
 .../org/apache/kylin/common/KylinConfigBase.java   |   2 +-
 .../apache/kylin/metrics/QuerySparkMetrics.java    |  98 ++++----
 examples/test_case_data/localmeta/kylin.properties |   5 +-
 .../spark/sql/metrics/SparderMetricsListener.scala |   2 +-
 .../apache/kylin/rest/init/InitialTaskManager.java |   2 +
 .../kylin/rest/metrics/QueryMetricsFacade.java     |   4 +-
 .../apache/kylin/rest/response/SQLResponse.java    |  10 +
 .../kylin/rest/service/DashboardService.java       |   1 -
 .../apache/kylin/rest/service/QueryService.java    |  12 +-
 .../kylin/rest/metrics/QueryMetricsTest.java       | 271 ++++++++++++++++++++-
 .../tool/metrics/systemcube/CubeDescCreator.java   |   2 +-
 .../tool/metrics/systemcube/HiveTableCreator.java  |   2 +-
 13 files changed, 363 insertions(+), 68 deletions(-)

diff --git a/build/bin/system-cube.sh b/build/bin/system-cube.sh
index 20f7861..ca35970 100644
--- a/build/bin/system-cube.sh
+++ b/build/bin/system-cube.sh
@@ -74,18 +74,14 @@ then
 	cat <<-EOF > ${SINK_TOOLS_FILE}
 	[
 	  [
-		"org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool",
-		{
-		  "storage_type": 2,
-		  "cube_desc_override_properties": [
-			"java.util.HashMap",
-			{
-			  "kylin.cube.algorithm": "INMEM",
-			  "kylin.cube.max-building-segments": "1"
-			}
-		  ]
-		}
-	  ]
+    {
+       "sink": "hive",
+       "storage_type": 4,
+       "cube_desc_override_properties": {
+         "kylin.cube.max-building-segments": "1"
+       }
+    }
+    ]
 	]
 	EOF
   $KYLIN_HOME/bin/kylin.sh org.apache.kylin.tool.metrics.systemcube.SCCreator \
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 36950ec..21f05db 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2364,7 +2364,7 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     public int getKylinMetricsCacheExpireSeconds() {
-        return Integer.parseInt(this.getOptional("kylin.metrics.query-cache.expire-seconds", "600"));
+        return Integer.parseInt(this.getOptional("kylin.metrics.query-cache.expire-seconds", "300"));
     }
 
     public int getKylinMetricsCacheMaxEntries() {
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java b/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java
index ed2430c..a0efe64 100644
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java
@@ -36,42 +36,62 @@ import java.io.Serializable;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 public class QuerySparkMetrics {
     private static final Logger logger = LoggerFactory.getLogger(QuerySparkMetrics.class);
-    private static final QuerySparkMetrics instance = new QuerySparkMetrics();
+    private static ScheduledExecutorService scheduledExecutor = null;
+    private static QuerySparkMetrics instance =
+            new QuerySparkMetrics(new QuerySparkMetricsRemovalListener());
     private static final int sparkMetricsNum = 10;
     private org.apache.kylin.shaded.com.google.common.cache.Cache<String, QueryExecutionMetrics> queryExecutionMetricsMap;
 
-    private QuerySparkMetrics() {
+    // default removal listener
+    private static class QuerySparkMetricsRemovalListener implements RemovalListener<String,
+            QueryExecutionMetrics> {
+        @Override
+        public void onRemoval(RemovalNotification<String, QueryExecutionMetrics> notification) {
+            try {
+                updateMetricsToReservoir(notification.getKey(), notification.getValue());
+                logger.info("Query metrics {} is removed due to {}, update to metrics reservoir successful",
+                        notification.getKey(), notification.getCause());
+            } catch (Exception e) {
+                logger.warn("Query metrics {} is removed due to {}, update to metrics reservoir failed",
+                        notification.getKey(), notification.getCause());
+            }
+        }
+    }
+
+    private QuerySparkMetrics(RemovalListener removalListener) {
+        if (queryExecutionMetricsMap != null) {
+            queryExecutionMetricsMap.cleanUp();
+            queryExecutionMetricsMap = null;
+        }
         queryExecutionMetricsMap = CacheBuilder.newBuilder()
                 .maximumSize(KylinConfig.getInstanceFromEnv().getKylinMetricsCacheMaxEntries())
                 .expireAfterWrite(KylinConfig.getInstanceFromEnv().getKylinMetricsCacheExpireSeconds(),
                         TimeUnit.SECONDS)
-                .removalListener(new RemovalListener<String, QueryExecutionMetrics>() {
-                    @Override
-                    public void onRemoval(RemovalNotification<String, QueryExecutionMetrics> notification) {
-                        try {
-                            updateMetricsToReservoir(notification.getKey(), notification.getValue());
-                            logger.info("Query metrics {} is removed due to {}, update to metrics reservoir successful",
-                                    notification.getKey(), notification.getCause());
-                        } catch(Exception e) {
-                            logger.warn("Query metrics {} is removed due to {}, update to metrics reservoir failed",
-                                    notification.getKey(), notification.getCause());
-                        }
-                    }
-                }).build();
-
-        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
-                                                                                @Override
-                                                                                public void run() {
-                                                                                    queryExecutionMetricsMap.cleanUp();
-                                                                                }
-                                                                            },
+                .removalListener(removalListener).build();
+
+        if (scheduledExecutor != null && !scheduledExecutor.isShutdown()) {
+            scheduledExecutor.shutdown();
+            scheduledExecutor = null;
+        }
+        scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+        scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
+                                                     @Override
+                                                     public void run() {
+                                                         queryExecutionMetricsMap.cleanUp();
+                                                     }
+                                                 },
                 KylinConfig.getInstanceFromEnv().getKylinMetricsCacheExpireSeconds(),
                 KylinConfig.getInstanceFromEnv().getKylinMetricsCacheExpireSeconds(), TimeUnit.SECONDS);
+    }
 
+    // only for test case
+    public static void init(RemovalListener removalListener) {
+        instance = new QuerySparkMetrics(removalListener);
     }
 
     public static QuerySparkMetrics getInstance() {
@@ -159,19 +179,11 @@ public class QuerySparkMetrics {
         return queryExecutionMetricsMap.getIfPresent(queryId);
     }
 
-    public void setQueryRealization(String queryId, String realizationName, int realizationType, String cuboidIds) {
-        QueryExecutionMetrics queryExecutionMetrics = queryExecutionMetricsMap.getIfPresent(queryId);
-        if (queryExecutionMetrics != null) {
-            queryExecutionMetrics.setRealization(realizationName);
-            queryExecutionMetrics.setRealizationType(realizationType);
-            queryExecutionMetrics.setCuboidIds(cuboidIds);
-        }
-    }
-
     /**
      * report query related metrics
      */
-    public void updateMetricsToReservoir(String queryId, QueryExecutionMetrics queryExecutionMetrics) {
+    public static void updateMetricsToReservoir(String queryId,
+                                                QueryExecutionMetrics queryExecutionMetrics) {
         if (!KylinConfig.getInstanceFromEnv().isKylinMetricsReporterForQueryEnabled()) {
             return;
         }
@@ -186,7 +198,8 @@ public class QuerySparkMetrics {
 
             setSparkExecutionWrapper(queryExecutionMetricsEvent, queryExecutionMetrics.getSparderName(),
                     queryExecutionMetrics.getExecutionId(), queryExecutionMetrics.getRealization(),
-                    queryExecutionMetrics.getRealizationType(), queryExecutionMetrics.getCuboidIds(),
+                    queryExecutionMetrics.getRealizationTypes(),
+                    queryExecutionMetrics.getCuboidIds(),
                     queryExecutionMetrics.getStartTime(), queryExecutionMetrics.getEndTime());
 
             setQueryMetrics(queryExecutionMetricsEvent, queryExecutionMetrics.getSqlDuration(),
@@ -244,7 +257,8 @@ public class QuerySparkMetrics {
                     queryExecutionMetricsList[i] += sparkJobMetricsList[i];
                 }
             }
-            setSparkExecutionMetrics(queryExecutionMetricsEvent, queryExecutionMetrics.getExecutionDuration(),
+            setSparkExecutionMetrics(queryExecutionMetricsEvent,
+                    queryExecutionMetrics.getEndTime() - queryExecutionMetrics.getStartTime(),
                     queryExecutionMetricsList[0], queryExecutionMetricsList[1], queryExecutionMetricsList[2],
                     queryExecutionMetricsList[3], queryExecutionMetricsList[4], queryExecutionMetricsList[5],
                     queryExecutionMetricsList[6], queryExecutionMetricsList[7], queryExecutionMetricsList[8],
@@ -264,8 +278,10 @@ public class QuerySparkMetrics {
         metricsEvent.put(QuerySparkExecutionEnum.EXCEPTION.toString(), exception);
     }
 
-    private static void setSparkExecutionWrapper(RecordEvent metricsEvent, String sparderName, long executionId,
-            String realizationName, int realizationType, String cuboidIds, long startTime, long endTime) {
+    private static void setSparkExecutionWrapper(RecordEvent metricsEvent, String sparderName,
+                                                 long executionId, String realizationName,
+                                                 String realizationType, String cuboidIds,
+                                                 long startTime, long endTime) {
         metricsEvent.put(QuerySparkExecutionEnum.SPARDER_NAME.toString(), sparderName);
         metricsEvent.put(QuerySparkExecutionEnum.EXECUTION_ID.toString(), executionId);
         metricsEvent.put(QuerySparkExecutionEnum.REALIZATION.toString(), realizationName);
@@ -365,7 +381,7 @@ public class QuerySparkMetrics {
         private long executionDuration;
         private String queryId;
         private String realization;
-        private int realizationType;
+        private String realizationTypes;
         private String cuboidIds;
         private long startTime;
         private long endTime;
@@ -512,16 +528,16 @@ public class QuerySparkMetrics {
             return realization;
         }
 
-        public int getRealizationType() {
-            return realizationType;
+        public String getRealizationTypes() {
+            return realizationTypes;
         }
 
         public void setRealization(String realization) {
             this.realization = realization;
         }
 
-        public void setRealizationType(int realizationType) {
-            this.realizationType = realizationType;
+        public void setRealizationTypes(String realizationTypes) {
+            this.realizationTypes = realizationTypes;
         }
 
         public void setSparkJobMetricsMap(ConcurrentMap<Integer, SparkJobMetrics> sparkJobMetricsMap) {
diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties
index ca9543e..d17ce8c 100644
--- a/examples/test_case_data/localmeta/kylin.properties
+++ b/examples/test_case_data/localmeta/kylin.properties
@@ -159,4 +159,7 @@ kylin.source.jdbc.connection-url=jdbc:h2:mem:db
 kylin.source.jdbc.user=
 kylin.source.jdbc.pass=
 
-kylin.query.auto-sparder-context=false
\ No newline at end of file
+kylin.query.auto-sparder-context=false
+
+kylin.metrics.query-cache.expire-seconds=5
+kylin.metrics.query-cache.max-entries=2
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala
index 6237235..84097e6 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala
@@ -98,7 +98,7 @@ class SparderMetricsListener() extends SparkListener with Logging {
       val stageMetrics = stageInfo.taskMetrics
       val sparkStageMetrics = new QuerySparkMetrics.SparkStageMetrics
       sparkStageMetrics.setMetrics(stageMetrics.resultSize, stageMetrics.executorDeserializeCpuTime,
-        stageMetrics.executorDeserializeCpuTime, stageMetrics.executorRunTime, stageMetrics.executorCpuTime,
+        stageMetrics.executorDeserializeTime, stageMetrics.executorRunTime, stageMetrics.executorCpuTime,
         stageMetrics.jvmGCTime, stageMetrics.resultSerializationTime,
         stageMetrics.memoryBytesSpilled, stageMetrics.diskBytesSpilled, stageMetrics.peakExecutionMemory)
       queryExecutionMetrics.updateSparkStageMetrics(jobExecutionMap.apply(stageJobMap.apply(stageInfo.stageId)).queryId,
diff --git a/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java b/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
index 876ae08..feaf3a3 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
@@ -21,6 +21,7 @@ package org.apache.kylin.rest.init;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.metrics.QuerySparkMetrics;
 import org.apache.kylin.rest.metrics.QueryMetrics2Facade;
 import org.apache.kylin.rest.metrics.QueryMetricsFacade;
 import org.slf4j.Logger;
@@ -44,6 +45,7 @@ public class InitialTaskManager implements InitializingBean {
     private void runInitialTasks() {
 
         // init metrics system for kylin
+        QuerySparkMetrics.getInstance();
         QueryMetricsFacade.init();
         QueryMetrics2Facade.init();
 
diff --git a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
index 7a8f2d7..db01f33 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
@@ -93,7 +93,9 @@ public class QueryMetricsFacade {
             queryExecutionMetrics.setSqlIdCode(getSqlHashCode(sqlRequest.getSql()));
             queryExecutionMetrics.setProject(norm(sqlRequest.getProject()));
             queryExecutionMetrics.setQueryType(sqlResponse.isStorageCacheUsed() ? "CACHE" : "PARQUET");
-
+            queryExecutionMetrics.setRealization(sqlResponse.getCube());
+            queryExecutionMetrics.setRealizationTypes(sqlResponse.getRealizationTypes());
+            queryExecutionMetrics.setCuboidIds(sqlResponse.getCuboidIds());
             queryExecutionMetrics.setSqlDuration(sqlResponse.getDuration());
             queryExecutionMetrics.setTotalScanCount(sqlResponse.getTotalScanCount());
             queryExecutionMetrics.setTotalScanBytes(sqlResponse.getTotalScanBytes());
diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
index d5d57ed..37578ec 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
@@ -48,6 +48,8 @@ public class SQLResponse implements Serializable {
 
     protected String cuboidIds;
 
+    protected String realizationTypes;
+
     // if not select query, only return affected row count
     protected int affectedRowCount;
 
@@ -286,6 +288,14 @@ public class SQLResponse implements Serializable {
         this.lazyQueryStartTime = lazyQueryStartTime;
     }
 
+    public String getRealizationTypes() {
+        return realizationTypes;
+    }
+
+    public void setRealizationTypes(String realizationTypes) {
+        this.realizationTypes = realizationTypes;
+    }
+
     @JsonIgnore
     public List<QueryContext.CubeSegmentStatisticsResult> getCubeSegmentStatisticsList() {
         try {
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java b/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java
index 768876f..392fac2 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java
@@ -32,7 +32,6 @@ import org.apache.kylin.metrics.MetricsManager;
 import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
 import org.apache.kylin.metrics.property.JobPropertyEnum;
 import org.apache.kylin.metrics.property.QuerySparkExecutionEnum;
-import org.apache.kylin.metrics.property.QuerySparkExecutionEnum;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.exception.BadRequestException;
 import org.apache.kylin.rest.request.PrepareSqlRequest;
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index f020d6b..7296636 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -1180,6 +1180,7 @@ public class QueryService extends BasicService {
         List<String> realizations = Lists.newLinkedList();
         StringBuilder cubeSb = new StringBuilder();
         StringBuilder cuboidIdsSb = new StringBuilder();
+        StringBuilder realizationTypeSb = new StringBuilder();
         StringBuilder logSb = new StringBuilder("Processed rows for each storageContext: ");
         QueryContext queryContext = QueryContextFacade.current();
         if (OLAPContext.getThreadLocalContexts() != null) { // contexts can be null in case of 'explain plan for'
@@ -1191,6 +1192,7 @@ public class QueryService extends BasicService {
                     if (cubeSb.length() > 0) {
                         cubeSb.append(",");
                     }
+                    cubeSb.append(ctx.realization.getCanonicalName());
                     Cuboid cuboid = ctx.storageContext.getCuboid();
                     if (cuboid != null) {
                         //Some queries do not involve cuboid, e.g. lookup table query
@@ -1199,25 +1201,25 @@ public class QueryService extends BasicService {
                         }
                         cuboidIdsSb.append(cuboid.getId());
                     }
-                    cubeSb.append(ctx.realization.getCanonicalName());
                     logSb.append(ctx.storageContext.getProcessedRowCount()).append(" ");
 
                     realizationName = ctx.realization.getName();
                     realizationType = ctx.realization.getStorageType();
+                    if (realizationTypeSb.length() > 0) {
+                        realizationTypeSb.append(",");
+                    }
+                    realizationTypeSb.append(realizationType);
 
                     realizations.add(realizationName);
                 }
-                QuerySparkMetrics.getInstance().setQueryRealization(queryContext.getQueryId(), realizationName,
-                        realizationType, cuboidIdsSb.toString());
             }
-
-
         }
         logger.info(logSb.toString());
 
         SQLResponse response = new SQLResponse(columnMetas, results, cubeSb.toString(), 0, isException,
                 exceptionMessage, isPartialResult, isPushDown);
         response.setCuboidIds(cuboidIdsSb.toString());
+        response.setRealizationTypes(realizationTypeSb.toString());
         response.setTotalScanCount(queryContext.getScannedRows());
         response.setTotalScanFiles((queryContext.getScanFiles() < 0) ? -1 :
                 queryContext.getScanFiles());
diff --git a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
index fbacc78..3ad7eea 100644
--- a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
@@ -21,23 +21,31 @@ package org.apache.kylin.rest.metrics;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.QueryContext;
 import org.apache.kylin.common.QueryContextFacade;
+import org.apache.kylin.metrics.QuerySparkMetrics;
 import org.apache.kylin.rest.request.SQLRequest;
 import org.apache.kylin.rest.response.SQLResponse;
 import org.apache.kylin.rest.service.ServiceTestBase;
+import org.apache.kylin.shaded.com.google.common.cache.RemovalListener;
+import org.apache.kylin.shaded.com.google.common.cache.RemovalNotification;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 
 public class QueryMetricsTest extends ServiceTestBase {
 
     private static MBeanServer mBeanServer;
     private static ObjectName objectName;
+    private static AtomicInteger sparkMetricsReportCnt = new AtomicInteger(0);
 
-//    @Before
+    @Before
     public void setup() throws Exception {
         super.setup();
 
@@ -45,7 +53,7 @@ public class QueryMetricsTest extends ServiceTestBase {
         objectName = new ObjectName("Hadoop:service=Kylin,name=Server_Total");
     }
 
-//    @Test
+    @Test
     public void testQueryMetrics() throws Exception {
         System.setProperty("kylin.server.query-metrics-enabled", "true");
         QueryMetricsFacade.init();
@@ -111,7 +119,7 @@ public class QueryMetricsTest extends ServiceTestBase {
         System.clearProperty("kylin.server.query-metrics-enabled");
     }
 
-//    @Test
+    @Test
     public void testQueryStatisticsResult() throws Exception {
         System.setProperty("kylin.metrics.reporter-query-enabled", "true");
         QueryMetricsFacade.init();
@@ -153,4 +161,261 @@ public class QueryMetricsTest extends ServiceTestBase {
         System.clearProperty("kylin.server.query-metrics-enabled");
         System.out.println("------------testQueryStatisticsResult done------------");
     }
+
+    @Test
+    public void testQuerySparkMetrics() throws Exception {
+        sparkMetricsReportCnt.set(0);
+        System.setProperty("kylin.server.query-metrics-enabled", "true");
+        QuerySparkMetrics.init(new QuerySparkMetricsTestRemovalListener());
+        QueryMetricsFacade.init();
+
+        SQLRequest sqlRequest = new SQLRequest();
+        sqlRequest.setSql("select * from TEST_KYLIN_FACT");
+        sqlRequest.setProject("default");
+
+        String queryId1 = "1";
+        generateSparkMetrics(queryId1);
+
+        SQLResponse sqlResponse = new SQLResponse();
+        sqlResponse.setDuration(10);
+        sqlResponse.setCube("test_cube");
+        sqlResponse.setCuboidIds("12345");
+        sqlResponse.setRealizationTypes("4");
+        sqlResponse.setIsException(false);
+        sqlResponse.setTotalScanCount(100);
+        List<String> list1 = new ArrayList<>();
+        list1.add("111");
+        list1.add("112");
+        List<String> list2 = new ArrayList<>();
+        list2.add("111");
+        list2.add("112");
+        List<List<String>> results = new ArrayList<>();
+        results.add(list1);
+        results.add(list2);
+        sqlResponse.setResults(results);
+        sqlResponse.setStorageCacheUsed(true);
+
+        QueryMetricsFacade.updateMetrics(queryId1, sqlRequest, sqlResponse);
+
+        Thread.sleep(3000);
+
+        updateSparkMetrics(queryId1);
+
+        Assert.assertTrue(QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryId1) != null);
+        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QueryCount"));
+        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QuerySuccessCount"));
+        Assert.assertEquals(0L, mBeanServer.getAttribute(objectName, "QueryFailCount"));
+        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "CacheHitCount"));
+
+        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "ScanRowCountNumOps"));
+        Assert.assertEquals(100.0, mBeanServer.getAttribute(objectName, "ScanRowCountAvgTime"));
+        Assert.assertEquals(100.0, mBeanServer.getAttribute(objectName, "ScanRowCountMaxTime"));
+        Assert.assertEquals(100.0, mBeanServer.getAttribute(objectName, "ScanRowCountMinTime"));
+
+        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "ResultRowCountNumOps"));
+        Assert.assertEquals(2.0, mBeanServer.getAttribute(objectName, "ResultRowCountMaxTime"));
+        Assert.assertEquals(2.0, mBeanServer.getAttribute(objectName, "ResultRowCountAvgTime"));
+        Assert.assertEquals(2.0, mBeanServer.getAttribute(objectName, "ResultRowCountMinTime"));
+
+        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QueryLatencyNumOps"));
+        Assert.assertEquals(10.0, mBeanServer.getAttribute(objectName, "QueryLatencyMaxTime"));
+        Assert.assertEquals(10.0, mBeanServer.getAttribute(objectName, "QueryLatencyAvgTime"));
+        Assert.assertEquals(10.0, mBeanServer.getAttribute(objectName, "QueryLatencyMinTime"));
+
+        String queryId2 = "2";
+        generateSparkMetrics(queryId2);
+
+        Thread.sleep(3000);
+
+        Assert.assertTrue(QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryId1) == null);
+        Assert.assertTrue(QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryId2) != null);
+
+        updateSparkMetrics(queryId2);
+
+        SQLResponse sqlResponse2 = new SQLResponse();
+        sqlResponse2.setDuration(10);
+        sqlResponse2.setCube("test_cube");
+        sqlResponse2.setIsException(true);
+
+        QueryMetricsFacade.updateMetrics(queryId2, sqlRequest, sqlResponse2);
+
+        Thread.sleep(5000);
+
+        Assert.assertEquals(2L, mBeanServer.getAttribute(objectName, "QueryCount"));
+        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QuerySuccessCount"));
+        Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QueryFailCount"));
+
+        Assert.assertTrue(QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryId2) == null);
+        Assert.assertEquals(2, sparkMetricsReportCnt.get());
+        System.clearProperty("kylin.server.query-metrics-enabled");
+    }
+
+    public void generateSparkMetrics(String queryId) {
+        Integer id = Integer.valueOf(queryId);
+        long executionStartTime = 0;
+        long jobStartTime= 0;
+        long submitTime = 0;
+
+        if (id == 1) {
+            executionStartTime = 1610609727972L;
+            jobStartTime = 1610609772880L;
+            submitTime = 1610610480942L;
+        } else if (id == 2) {
+            executionStartTime = 1610609876545L;
+            jobStartTime = 1610609901699L;
+            submitTime = 16106105016542L;
+        }
+        QuerySparkMetrics.getInstance().onJobStart(queryId, "test-sparder_" + id, id,
+                executionStartTime, id, jobStartTime);
+        QuerySparkMetrics.getInstance().onSparkStageStart(queryId, id, id, "stageType_" + id,
+                submitTime);
+    }
+
+    public void updateSparkMetrics(String queryId) {
+        Integer id = Integer.valueOf(queryId);
+        long jobEndTime = 0;
+        long executionEndTime = 0;
+        boolean jobIsSuccess = true;
+
+        QuerySparkMetrics.SparkStageMetrics queryStageMetrics = new QuerySparkMetrics.SparkStageMetrics();
+        if (id == 1) {
+            jobEndTime = 1610610734401L;
+            executionEndTime = 1610612655793L;
+            jobIsSuccess = true;
+            queryStageMetrics.setMetrics(10000, 10, 10, 100, 10, 1, 10, 1000, 1000, 100);
+        } else if (id == 2) {
+            jobEndTime = 1610610750397L;
+            executionEndTime = 1610612685275L;
+            jobIsSuccess = false;
+            queryStageMetrics.setMetrics(20000, 20, 20, 200, 20, 2, 20, 2000, 2000, 200);
+        }
+        QuerySparkMetrics.getInstance().updateSparkStageMetrics(queryId, id, id, true,
+                queryStageMetrics);
+        QuerySparkMetrics.getInstance().updateSparkJobMetrics(queryId, id, jobEndTime, jobIsSuccess);
+        QuerySparkMetrics.getInstance().updateExecutionMetrics(queryId, executionEndTime);
+    }
+
+    public static void verifyQuerySparkMetrics(String queryId,
+                                               QuerySparkMetrics.QueryExecutionMetrics queryExecutionMetrics) {
+        sparkMetricsReportCnt.getAndIncrement();
+        Assert.assertTrue(StringUtils.isNotBlank(queryId));
+        Assert.assertTrue(queryExecutionMetrics != null);
+        // verify
+        int id = Integer.valueOf(queryId);
+        Assert.assertTrue(queryExecutionMetrics.getSparkJobMetricsMap().get(id) != null);
+        Assert.assertTrue(queryExecutionMetrics.getSparkJobMetricsMap().get(id) != null);
+        Assert.assertEquals(queryExecutionMetrics.getSparderName(), "test-sparder_" + id);
+        Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id).getJobId(), id);
+
+        if (id == 1) {
+            //SqlResponse metrics
+            Assert.assertEquals(queryExecutionMetrics.getUser(), "ADMIN");
+            Assert.assertEquals(queryExecutionMetrics.getProject(), "DEFAULT");
+            Assert.assertEquals(queryExecutionMetrics.getQueryType(), "CACHE");
+            Assert.assertEquals(queryExecutionMetrics.getRealization(), "test_cube");
+            Assert.assertEquals(queryExecutionMetrics.getRealizationTypes(), "4");
+            Assert.assertEquals(queryExecutionMetrics.getCuboidIds(), "12345");
+            Assert.assertEquals(queryExecutionMetrics.getTotalScanCount(), 100);
+            Assert.assertEquals(queryExecutionMetrics.getResultCount(), 2);
+            Assert.assertEquals(queryExecutionMetrics.getException(), "NULL");
+
+            Assert.assertEquals(queryExecutionMetrics.getSparderName(), "test-sparder_" + id);
+            Assert.assertEquals(queryExecutionMetrics.getException(), "NULL");
+
+            //SparkExecution metrics
+            Assert.assertEquals(queryExecutionMetrics.getStartTime(), 1610609727972L);
+            Assert.assertEquals(queryExecutionMetrics.getEndTime(), 1610612655793L);
+            Assert.assertTrue(queryExecutionMetrics.getSparkJobMetricsMap().get(id).isSuccess());
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id).getStartTime(), 1610609772880L);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id).getEndTime(), 1610610734401L);
+
+            //SparkStage metrics
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().size(), 1);
+            Assert.assertTrue(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id) != null);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getStageType(), "stageType_" + id);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getStageId(), id);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getResultSize(), 10000);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getExecutorDeserializeCpuTime(), 10);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getExecutorDeserializeTime(), 10);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getExecutorRunTime(), 100);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getExecutorCpuTime(), 10);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getJvmGCTime(), 1);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getResultSerializationTime(), 10);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getMemoryBytesSpilled(), 1000);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getDiskBytesSpilled(), 1000);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getPeakExecutionMemory(), 100);
+        } else if (id == 2) {
+            //SqlResponse metrics
+            Assert.assertEquals(queryExecutionMetrics.getUser(), "ADMIN");
+            Assert.assertEquals(queryExecutionMetrics.getProject(), "DEFAULT");
+            Assert.assertEquals(queryExecutionMetrics.getQueryType(), "PARQUET");
+            Assert.assertEquals(queryExecutionMetrics.getRealization(), "test_cube");
+            Assert.assertEquals(queryExecutionMetrics.getRealizationTypes(), null);
+            Assert.assertEquals(queryExecutionMetrics.getCuboidIds(), null);
+            Assert.assertEquals(queryExecutionMetrics.getTotalScanCount(), 0);
+            Assert.assertEquals(queryExecutionMetrics.getResultCount(), 0);
+            Assert.assertEquals(queryExecutionMetrics.getException(), "NULL");
+
+            //SparkExecution metrics
+            Assert.assertEquals(queryExecutionMetrics.getStartTime(), 1610609876545L);
+            Assert.assertEquals(queryExecutionMetrics.getEndTime(), 1610612685275L);
+            Assert.assertFalse(queryExecutionMetrics.getSparkJobMetricsMap().get(id).isSuccess());
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id).getStartTime(), 1610609901699L);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id).getEndTime(), 1610610750397L);
+
+            //SparkStage metrics
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().size(), 1);
+            Assert.assertTrue(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id) != null);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getStageId(), id);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getResultSize(), 20000);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getExecutorDeserializeCpuTime(), 20);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getExecutorDeserializeTime(), 20);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getExecutorRunTime(), 200);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getExecutorCpuTime(), 20);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getJvmGCTime(), 2);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getResultSerializationTime(), 20);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getMemoryBytesSpilled(), 2000);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getDiskBytesSpilled(), 2000);
+            Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+                    .getSparkStageMetricsMap().get(id).getPeakExecutionMemory(), 200);
+        }
+    }
+
+    private static class QuerySparkMetricsTestRemovalListener implements RemovalListener<String,
+            QuerySparkMetrics.QueryExecutionMetrics> {
+        @Override
+        public void onRemoval(RemovalNotification<String, QuerySparkMetrics.QueryExecutionMetrics> notification) {
+            try {
+                verifyQuerySparkMetrics(notification.getKey(), notification.getValue());
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
 }
diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
index 2d82e02..f3e7168 100644
--- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
@@ -467,7 +467,7 @@ public class CubeDescCreator {
         desc.setDimensions(dimensionDescList);
         desc.setMeasures(measureDescList);
         desc.setRowkey(rowKeyDesc);
-        //desc.setHbaseMapping(hBaseMapping);
+        desc.setHbaseMapping(hBaseMapping);
         desc.setNotifyList(Lists.<String> newArrayList());
         desc.setStatusNeedNotify(Lists.newArrayList(JobStatusEnum.ERROR.toString()));
         desc.setAutoMergeTimeRanges(new long[] { 86400000L, 604800000L, 2419200000L });
diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
index 73e493e..af40392 100644
--- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
@@ -130,7 +130,7 @@ public class HiveTableCreator {
         columns.add(new Pair<>(QuerySparkExecutionEnum.SPARDER_NAME.toString(), HiveTypeEnum.HSTRING.toString()));
         columns.add(new Pair<>(QuerySparkExecutionEnum.PROJECT.toString(), HiveTypeEnum.HSTRING.toString()));
         columns.add(new Pair<>(QuerySparkExecutionEnum.REALIZATION.toString(), HiveTypeEnum.HSTRING.toString()));
-        columns.add(new Pair<>(QuerySparkExecutionEnum.REALIZATION_TYPE.toString(), HiveTypeEnum.HINT.toString()));
+        columns.add(new Pair<>(QuerySparkExecutionEnum.REALIZATION_TYPE.toString(), HiveTypeEnum.HSTRING.toString()));
         columns.add(new Pair<>(QuerySparkExecutionEnum.CUBOID_IDS.toString(), HiveTypeEnum.HSTRING.toString()));
 
         columns.add(new Pair<>(QuerySparkExecutionEnum.TYPE.toString(), HiveTypeEnum.HSTRING.toString()));