You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/01/17 15:17:48 UTC
[kylin] 02/07: add test case
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit ad7b47245c959ea3e57efe1c8f5a737d0b1653ed
Author: Zhichao Zhang <44...@qq.com>
AuthorDate: Thu Jan 14 14:51:59 2021 +0800
add test case
Add metrics check
---
build/bin/system-cube.sh | 20 +-
.../org/apache/kylin/common/KylinConfigBase.java | 2 +-
.../apache/kylin/metrics/QuerySparkMetrics.java | 98 ++++----
examples/test_case_data/localmeta/kylin.properties | 5 +-
.../spark/sql/metrics/SparderMetricsListener.scala | 2 +-
.../apache/kylin/rest/init/InitialTaskManager.java | 2 +
.../kylin/rest/metrics/QueryMetricsFacade.java | 4 +-
.../apache/kylin/rest/response/SQLResponse.java | 10 +
.../kylin/rest/service/DashboardService.java | 1 -
.../apache/kylin/rest/service/QueryService.java | 12 +-
.../kylin/rest/metrics/QueryMetricsTest.java | 271 ++++++++++++++++++++-
.../tool/metrics/systemcube/CubeDescCreator.java | 2 +-
.../tool/metrics/systemcube/HiveTableCreator.java | 2 +-
13 files changed, 363 insertions(+), 68 deletions(-)
diff --git a/build/bin/system-cube.sh b/build/bin/system-cube.sh
index 20f7861..ca35970 100644
--- a/build/bin/system-cube.sh
+++ b/build/bin/system-cube.sh
@@ -74,18 +74,14 @@ then
cat <<-EOF > ${SINK_TOOLS_FILE}
[
[
- "org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool",
- {
- "storage_type": 2,
- "cube_desc_override_properties": [
- "java.util.HashMap",
- {
- "kylin.cube.algorithm": "INMEM",
- "kylin.cube.max-building-segments": "1"
- }
- ]
- }
- ]
+ {
+ "sink": "hive",
+ "storage_type": 4,
+ "cube_desc_override_properties": {
+ "kylin.cube.max-building-segments": "1"
+ }
+ }
+ ]
]
EOF
$KYLIN_HOME/bin/kylin.sh org.apache.kylin.tool.metrics.systemcube.SCCreator \
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 36950ec..21f05db 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2364,7 +2364,7 @@ public abstract class KylinConfigBase implements Serializable {
}
public int getKylinMetricsCacheExpireSeconds() {
- return Integer.parseInt(this.getOptional("kylin.metrics.query-cache.expire-seconds", "600"));
+ return Integer.parseInt(this.getOptional("kylin.metrics.query-cache.expire-seconds", "300"));
}
public int getKylinMetricsCacheMaxEntries() {
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java b/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java
index ed2430c..a0efe64 100644
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/QuerySparkMetrics.java
@@ -36,42 +36,62 @@ import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class QuerySparkMetrics {
private static final Logger logger = LoggerFactory.getLogger(QuerySparkMetrics.class);
- private static final QuerySparkMetrics instance = new QuerySparkMetrics();
+ private static ScheduledExecutorService scheduledExecutor = null;
+ private static QuerySparkMetrics instance =
+ new QuerySparkMetrics(new QuerySparkMetricsRemovalListener());
private static final int sparkMetricsNum = 10;
private org.apache.kylin.shaded.com.google.common.cache.Cache<String, QueryExecutionMetrics> queryExecutionMetricsMap;
- private QuerySparkMetrics() {
+ // default removal listener
+ private static class QuerySparkMetricsRemovalListener implements RemovalListener<String,
+ QueryExecutionMetrics> {
+ @Override
+ public void onRemoval(RemovalNotification<String, QueryExecutionMetrics> notification) {
+ try {
+ updateMetricsToReservoir(notification.getKey(), notification.getValue());
+ logger.info("Query metrics {} is removed due to {}, update to metrics reservoir successful",
+ notification.getKey(), notification.getCause());
+ } catch (Exception e) {
+ logger.warn("Query metrics {} is removed due to {}, update to metrics reservoir failed",
+ notification.getKey(), notification.getCause());
+ }
+ }
+ }
+
+ private QuerySparkMetrics(RemovalListener removalListener) {
+ if (queryExecutionMetricsMap != null) {
+ queryExecutionMetricsMap.cleanUp();
+ queryExecutionMetricsMap = null;
+ }
queryExecutionMetricsMap = CacheBuilder.newBuilder()
.maximumSize(KylinConfig.getInstanceFromEnv().getKylinMetricsCacheMaxEntries())
.expireAfterWrite(KylinConfig.getInstanceFromEnv().getKylinMetricsCacheExpireSeconds(),
TimeUnit.SECONDS)
- .removalListener(new RemovalListener<String, QueryExecutionMetrics>() {
- @Override
- public void onRemoval(RemovalNotification<String, QueryExecutionMetrics> notification) {
- try {
- updateMetricsToReservoir(notification.getKey(), notification.getValue());
- logger.info("Query metrics {} is removed due to {}, update to metrics reservoir successful",
- notification.getKey(), notification.getCause());
- } catch(Exception e) {
- logger.warn("Query metrics {} is removed due to {}, update to metrics reservoir failed",
- notification.getKey(), notification.getCause());
- }
- }
- }).build();
-
- Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- queryExecutionMetricsMap.cleanUp();
- }
- },
+ .removalListener(removalListener).build();
+
+ if (scheduledExecutor != null && !scheduledExecutor.isShutdown()) {
+ scheduledExecutor.shutdown();
+ scheduledExecutor = null;
+ }
+ scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+ scheduledExecutor.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ queryExecutionMetricsMap.cleanUp();
+ }
+ },
KylinConfig.getInstanceFromEnv().getKylinMetricsCacheExpireSeconds(),
KylinConfig.getInstanceFromEnv().getKylinMetricsCacheExpireSeconds(), TimeUnit.SECONDS);
+ }
+ // only for test case
+ public static void init(RemovalListener removalListener) {
+ instance = new QuerySparkMetrics(removalListener);
}
public static QuerySparkMetrics getInstance() {
@@ -159,19 +179,11 @@ public class QuerySparkMetrics {
return queryExecutionMetricsMap.getIfPresent(queryId);
}
- public void setQueryRealization(String queryId, String realizationName, int realizationType, String cuboidIds) {
- QueryExecutionMetrics queryExecutionMetrics = queryExecutionMetricsMap.getIfPresent(queryId);
- if (queryExecutionMetrics != null) {
- queryExecutionMetrics.setRealization(realizationName);
- queryExecutionMetrics.setRealizationType(realizationType);
- queryExecutionMetrics.setCuboidIds(cuboidIds);
- }
- }
-
/**
* report query related metrics
*/
- public void updateMetricsToReservoir(String queryId, QueryExecutionMetrics queryExecutionMetrics) {
+ public static void updateMetricsToReservoir(String queryId,
+ QueryExecutionMetrics queryExecutionMetrics) {
if (!KylinConfig.getInstanceFromEnv().isKylinMetricsReporterForQueryEnabled()) {
return;
}
@@ -186,7 +198,8 @@ public class QuerySparkMetrics {
setSparkExecutionWrapper(queryExecutionMetricsEvent, queryExecutionMetrics.getSparderName(),
queryExecutionMetrics.getExecutionId(), queryExecutionMetrics.getRealization(),
- queryExecutionMetrics.getRealizationType(), queryExecutionMetrics.getCuboidIds(),
+ queryExecutionMetrics.getRealizationTypes(),
+ queryExecutionMetrics.getCuboidIds(),
queryExecutionMetrics.getStartTime(), queryExecutionMetrics.getEndTime());
setQueryMetrics(queryExecutionMetricsEvent, queryExecutionMetrics.getSqlDuration(),
@@ -244,7 +257,8 @@ public class QuerySparkMetrics {
queryExecutionMetricsList[i] += sparkJobMetricsList[i];
}
}
- setSparkExecutionMetrics(queryExecutionMetricsEvent, queryExecutionMetrics.getExecutionDuration(),
+ setSparkExecutionMetrics(queryExecutionMetricsEvent,
+ queryExecutionMetrics.getEndTime() - queryExecutionMetrics.getStartTime(),
queryExecutionMetricsList[0], queryExecutionMetricsList[1], queryExecutionMetricsList[2],
queryExecutionMetricsList[3], queryExecutionMetricsList[4], queryExecutionMetricsList[5],
queryExecutionMetricsList[6], queryExecutionMetricsList[7], queryExecutionMetricsList[8],
@@ -264,8 +278,10 @@ public class QuerySparkMetrics {
metricsEvent.put(QuerySparkExecutionEnum.EXCEPTION.toString(), exception);
}
- private static void setSparkExecutionWrapper(RecordEvent metricsEvent, String sparderName, long executionId,
- String realizationName, int realizationType, String cuboidIds, long startTime, long endTime) {
+ private static void setSparkExecutionWrapper(RecordEvent metricsEvent, String sparderName,
+ long executionId, String realizationName,
+ String realizationType, String cuboidIds,
+ long startTime, long endTime) {
metricsEvent.put(QuerySparkExecutionEnum.SPARDER_NAME.toString(), sparderName);
metricsEvent.put(QuerySparkExecutionEnum.EXECUTION_ID.toString(), executionId);
metricsEvent.put(QuerySparkExecutionEnum.REALIZATION.toString(), realizationName);
@@ -365,7 +381,7 @@ public class QuerySparkMetrics {
private long executionDuration;
private String queryId;
private String realization;
- private int realizationType;
+ private String realizationTypes;
private String cuboidIds;
private long startTime;
private long endTime;
@@ -512,16 +528,16 @@ public class QuerySparkMetrics {
return realization;
}
- public int getRealizationType() {
- return realizationType;
+ public String getRealizationTypes() {
+ return realizationTypes;
}
public void setRealization(String realization) {
this.realization = realization;
}
- public void setRealizationType(int realizationType) {
- this.realizationType = realizationType;
+ public void setRealizationTypes(String realizationTypes) {
+ this.realizationTypes = realizationTypes;
}
public void setSparkJobMetricsMap(ConcurrentMap<Integer, SparkJobMetrics> sparkJobMetricsMap) {
diff --git a/examples/test_case_data/localmeta/kylin.properties b/examples/test_case_data/localmeta/kylin.properties
index ca9543e..d17ce8c 100644
--- a/examples/test_case_data/localmeta/kylin.properties
+++ b/examples/test_case_data/localmeta/kylin.properties
@@ -159,4 +159,7 @@ kylin.source.jdbc.connection-url=jdbc:h2:mem:db
kylin.source.jdbc.user=
kylin.source.jdbc.pass=
-kylin.query.auto-sparder-context=false
\ No newline at end of file
+kylin.query.auto-sparder-context=false
+
+kylin.metrics.query-cache.expire-seconds=5
+kylin.metrics.query-cache.max-entries=2
\ No newline at end of file
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala
index 6237235..84097e6 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/SparderMetricsListener.scala
@@ -98,7 +98,7 @@ class SparderMetricsListener() extends SparkListener with Logging {
val stageMetrics = stageInfo.taskMetrics
val sparkStageMetrics = new QuerySparkMetrics.SparkStageMetrics
sparkStageMetrics.setMetrics(stageMetrics.resultSize, stageMetrics.executorDeserializeCpuTime,
- stageMetrics.executorDeserializeCpuTime, stageMetrics.executorRunTime, stageMetrics.executorCpuTime,
+ stageMetrics.executorDeserializeTime, stageMetrics.executorRunTime, stageMetrics.executorCpuTime,
stageMetrics.jvmGCTime, stageMetrics.resultSerializationTime,
stageMetrics.memoryBytesSpilled, stageMetrics.diskBytesSpilled, stageMetrics.peakExecutionMemory)
queryExecutionMetrics.updateSparkStageMetrics(jobExecutionMap.apply(stageJobMap.apply(stageInfo.stageId)).queryId,
diff --git a/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java b/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
index 876ae08..feaf3a3 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/init/InitialTaskManager.java
@@ -21,6 +21,7 @@ package org.apache.kylin.rest.init;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.metrics.QuerySparkMetrics;
import org.apache.kylin.rest.metrics.QueryMetrics2Facade;
import org.apache.kylin.rest.metrics.QueryMetricsFacade;
import org.slf4j.Logger;
@@ -44,6 +45,7 @@ public class InitialTaskManager implements InitializingBean {
private void runInitialTasks() {
// init metrics system for kylin
+ QuerySparkMetrics.getInstance();
QueryMetricsFacade.init();
QueryMetrics2Facade.init();
diff --git a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
index 7a8f2d7..db01f33 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
@@ -93,7 +93,9 @@ public class QueryMetricsFacade {
queryExecutionMetrics.setSqlIdCode(getSqlHashCode(sqlRequest.getSql()));
queryExecutionMetrics.setProject(norm(sqlRequest.getProject()));
queryExecutionMetrics.setQueryType(sqlResponse.isStorageCacheUsed() ? "CACHE" : "PARQUET");
-
+ queryExecutionMetrics.setRealization(sqlResponse.getCube());
+ queryExecutionMetrics.setRealizationTypes(sqlResponse.getRealizationTypes());
+ queryExecutionMetrics.setCuboidIds(sqlResponse.getCuboidIds());
queryExecutionMetrics.setSqlDuration(sqlResponse.getDuration());
queryExecutionMetrics.setTotalScanCount(sqlResponse.getTotalScanCount());
queryExecutionMetrics.setTotalScanBytes(sqlResponse.getTotalScanBytes());
diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
index d5d57ed..37578ec 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
@@ -48,6 +48,8 @@ public class SQLResponse implements Serializable {
protected String cuboidIds;
+ protected String realizationTypes;
+
// if not select query, only return affected row count
protected int affectedRowCount;
@@ -286,6 +288,14 @@ public class SQLResponse implements Serializable {
this.lazyQueryStartTime = lazyQueryStartTime;
}
+ public String getRealizationTypes() {
+ return realizationTypes;
+ }
+
+ public void setRealizationTypes(String realizationTypes) {
+ this.realizationTypes = realizationTypes;
+ }
+
@JsonIgnore
public List<QueryContext.CubeSegmentStatisticsResult> getCubeSegmentStatisticsList() {
try {
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java b/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java
index 768876f..392fac2 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/DashboardService.java
@@ -32,7 +32,6 @@ import org.apache.kylin.metrics.MetricsManager;
import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
import org.apache.kylin.metrics.property.JobPropertyEnum;
import org.apache.kylin.metrics.property.QuerySparkExecutionEnum;
-import org.apache.kylin.metrics.property.QuerySparkExecutionEnum;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.exception.BadRequestException;
import org.apache.kylin.rest.request.PrepareSqlRequest;
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index f020d6b..7296636 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -1180,6 +1180,7 @@ public class QueryService extends BasicService {
List<String> realizations = Lists.newLinkedList();
StringBuilder cubeSb = new StringBuilder();
StringBuilder cuboidIdsSb = new StringBuilder();
+ StringBuilder realizationTypeSb = new StringBuilder();
StringBuilder logSb = new StringBuilder("Processed rows for each storageContext: ");
QueryContext queryContext = QueryContextFacade.current();
if (OLAPContext.getThreadLocalContexts() != null) { // contexts can be null in case of 'explain plan for'
@@ -1191,6 +1192,7 @@ public class QueryService extends BasicService {
if (cubeSb.length() > 0) {
cubeSb.append(",");
}
+ cubeSb.append(ctx.realization.getCanonicalName());
Cuboid cuboid = ctx.storageContext.getCuboid();
if (cuboid != null) {
//Some queries do not involve cuboid, e.g. lookup table query
@@ -1199,25 +1201,25 @@ public class QueryService extends BasicService {
}
cuboidIdsSb.append(cuboid.getId());
}
- cubeSb.append(ctx.realization.getCanonicalName());
logSb.append(ctx.storageContext.getProcessedRowCount()).append(" ");
realizationName = ctx.realization.getName();
realizationType = ctx.realization.getStorageType();
+ if (realizationTypeSb.length() > 0) {
+ realizationTypeSb.append(",");
+ }
+ realizationTypeSb.append(realizationType);
realizations.add(realizationName);
}
- QuerySparkMetrics.getInstance().setQueryRealization(queryContext.getQueryId(), realizationName,
- realizationType, cuboidIdsSb.toString());
}
-
-
}
logger.info(logSb.toString());
SQLResponse response = new SQLResponse(columnMetas, results, cubeSb.toString(), 0, isException,
exceptionMessage, isPartialResult, isPushDown);
response.setCuboidIds(cuboidIdsSb.toString());
+ response.setRealizationTypes(realizationTypeSb.toString());
response.setTotalScanCount(queryContext.getScannedRows());
response.setTotalScanFiles((queryContext.getScanFiles() < 0) ? -1 :
queryContext.getScanFiles());
diff --git a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
index fbacc78..3ad7eea 100644
--- a/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/metrics/QueryMetricsTest.java
@@ -21,23 +21,31 @@ package org.apache.kylin.rest.metrics;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.QueryContextFacade;
+import org.apache.kylin.metrics.QuerySparkMetrics;
import org.apache.kylin.rest.request.SQLRequest;
import org.apache.kylin.rest.response.SQLResponse;
import org.apache.kylin.rest.service.ServiceTestBase;
+import org.apache.kylin.shaded.com.google.common.cache.RemovalListener;
+import org.apache.kylin.shaded.com.google.common.cache.RemovalNotification;
import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
public class QueryMetricsTest extends ServiceTestBase {
private static MBeanServer mBeanServer;
private static ObjectName objectName;
+ private static AtomicInteger sparkMetricsReportCnt = new AtomicInteger(0);
-// @Before
+ @Before
public void setup() throws Exception {
super.setup();
@@ -45,7 +53,7 @@ public class QueryMetricsTest extends ServiceTestBase {
objectName = new ObjectName("Hadoop:service=Kylin,name=Server_Total");
}
-// @Test
+ @Test
public void testQueryMetrics() throws Exception {
System.setProperty("kylin.server.query-metrics-enabled", "true");
QueryMetricsFacade.init();
@@ -111,7 +119,7 @@ public class QueryMetricsTest extends ServiceTestBase {
System.clearProperty("kylin.server.query-metrics-enabled");
}
-// @Test
+ @Test
public void testQueryStatisticsResult() throws Exception {
System.setProperty("kylin.metrics.reporter-query-enabled", "true");
QueryMetricsFacade.init();
@@ -153,4 +161,261 @@ public class QueryMetricsTest extends ServiceTestBase {
System.clearProperty("kylin.server.query-metrics-enabled");
System.out.println("------------testQueryStatisticsResult done------------");
}
+
+ @Test
+ public void testQuerySparkMetrics() throws Exception {
+ sparkMetricsReportCnt.set(0);
+ System.setProperty("kylin.server.query-metrics-enabled", "true");
+ QuerySparkMetrics.init(new QuerySparkMetricsTestRemovalListener());
+ QueryMetricsFacade.init();
+
+ SQLRequest sqlRequest = new SQLRequest();
+ sqlRequest.setSql("select * from TEST_KYLIN_FACT");
+ sqlRequest.setProject("default");
+
+ String queryId1 = "1";
+ generateSparkMetrics(queryId1);
+
+ SQLResponse sqlResponse = new SQLResponse();
+ sqlResponse.setDuration(10);
+ sqlResponse.setCube("test_cube");
+ sqlResponse.setCuboidIds("12345");
+ sqlResponse.setRealizationTypes("4");
+ sqlResponse.setIsException(false);
+ sqlResponse.setTotalScanCount(100);
+ List<String> list1 = new ArrayList<>();
+ list1.add("111");
+ list1.add("112");
+ List<String> list2 = new ArrayList<>();
+ list2.add("111");
+ list2.add("112");
+ List<List<String>> results = new ArrayList<>();
+ results.add(list1);
+ results.add(list2);
+ sqlResponse.setResults(results);
+ sqlResponse.setStorageCacheUsed(true);
+
+ QueryMetricsFacade.updateMetrics(queryId1, sqlRequest, sqlResponse);
+
+ Thread.sleep(3000);
+
+ updateSparkMetrics(queryId1);
+
+ Assert.assertTrue(QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryId1) != null);
+ Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QueryCount"));
+ Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QuerySuccessCount"));
+ Assert.assertEquals(0L, mBeanServer.getAttribute(objectName, "QueryFailCount"));
+ Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "CacheHitCount"));
+
+ Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "ScanRowCountNumOps"));
+ Assert.assertEquals(100.0, mBeanServer.getAttribute(objectName, "ScanRowCountAvgTime"));
+ Assert.assertEquals(100.0, mBeanServer.getAttribute(objectName, "ScanRowCountMaxTime"));
+ Assert.assertEquals(100.0, mBeanServer.getAttribute(objectName, "ScanRowCountMinTime"));
+
+ Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "ResultRowCountNumOps"));
+ Assert.assertEquals(2.0, mBeanServer.getAttribute(objectName, "ResultRowCountMaxTime"));
+ Assert.assertEquals(2.0, mBeanServer.getAttribute(objectName, "ResultRowCountAvgTime"));
+ Assert.assertEquals(2.0, mBeanServer.getAttribute(objectName, "ResultRowCountMinTime"));
+
+ Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QueryLatencyNumOps"));
+ Assert.assertEquals(10.0, mBeanServer.getAttribute(objectName, "QueryLatencyMaxTime"));
+ Assert.assertEquals(10.0, mBeanServer.getAttribute(objectName, "QueryLatencyAvgTime"));
+ Assert.assertEquals(10.0, mBeanServer.getAttribute(objectName, "QueryLatencyMinTime"));
+
+ String queryId2 = "2";
+ generateSparkMetrics(queryId2);
+
+ Thread.sleep(3000);
+
+ Assert.assertTrue(QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryId1) == null);
+ Assert.assertTrue(QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryId2) != null);
+
+ updateSparkMetrics(queryId2);
+
+ SQLResponse sqlResponse2 = new SQLResponse();
+ sqlResponse2.setDuration(10);
+ sqlResponse2.setCube("test_cube");
+ sqlResponse2.setIsException(true);
+
+ QueryMetricsFacade.updateMetrics(queryId2, sqlRequest, sqlResponse2);
+
+ Thread.sleep(5000);
+
+ Assert.assertEquals(2L, mBeanServer.getAttribute(objectName, "QueryCount"));
+ Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QuerySuccessCount"));
+ Assert.assertEquals(1L, mBeanServer.getAttribute(objectName, "QueryFailCount"));
+
+ Assert.assertTrue(QuerySparkMetrics.getInstance().getQueryExecutionMetrics(queryId2) == null);
+ Assert.assertEquals(2, sparkMetricsReportCnt.get());
+ System.clearProperty("kylin.server.query-metrics-enabled");
+ }
+
+ public void generateSparkMetrics(String queryId) {
+ Integer id = Integer.valueOf(queryId);
+ long executionStartTime = 0;
+ long jobStartTime= 0;
+ long submitTime = 0;
+
+ if (id == 1) {
+ executionStartTime = 1610609727972L;
+ jobStartTime = 1610609772880L;
+ submitTime = 1610610480942L;
+ } else if (id == 2) {
+ executionStartTime = 1610609876545L;
+ jobStartTime = 1610609901699L;
+ submitTime = 16106105016542L;
+ }
+ QuerySparkMetrics.getInstance().onJobStart(queryId, "test-sparder_" + id, id,
+ executionStartTime, id, jobStartTime);
+ QuerySparkMetrics.getInstance().onSparkStageStart(queryId, id, id, "stageType_" + id,
+ submitTime);
+ }
+
+ public void updateSparkMetrics(String queryId) {
+ Integer id = Integer.valueOf(queryId);
+ long jobEndTime = 0;
+ long executionEndTime = 0;
+ boolean jobIsSuccess = true;
+
+ QuerySparkMetrics.SparkStageMetrics queryStageMetrics = new QuerySparkMetrics.SparkStageMetrics();
+ if (id == 1) {
+ jobEndTime = 1610610734401L;
+ executionEndTime = 1610612655793L;
+ jobIsSuccess = true;
+ queryStageMetrics.setMetrics(10000, 10, 10, 100, 10, 1, 10, 1000, 1000, 100);
+ } else if (id == 2) {
+ jobEndTime = 1610610750397L;
+ executionEndTime = 1610612685275L;
+ jobIsSuccess = false;
+ queryStageMetrics.setMetrics(20000, 20, 20, 200, 20, 2, 20, 2000, 2000, 200);
+ }
+ QuerySparkMetrics.getInstance().updateSparkStageMetrics(queryId, id, id, true,
+ queryStageMetrics);
+ QuerySparkMetrics.getInstance().updateSparkJobMetrics(queryId, id, jobEndTime, jobIsSuccess);
+ QuerySparkMetrics.getInstance().updateExecutionMetrics(queryId, executionEndTime);
+ }
+
+ public static void verifyQuerySparkMetrics(String queryId,
+ QuerySparkMetrics.QueryExecutionMetrics queryExecutionMetrics) {
+ sparkMetricsReportCnt.getAndIncrement();
+ Assert.assertTrue(StringUtils.isNotBlank(queryId));
+ Assert.assertTrue(queryExecutionMetrics != null);
+ // verify
+ int id = Integer.valueOf(queryId);
+ Assert.assertTrue(queryExecutionMetrics.getSparkJobMetricsMap().get(id) != null);
+ Assert.assertTrue(queryExecutionMetrics.getSparkJobMetricsMap().get(id) != null);
+ Assert.assertEquals(queryExecutionMetrics.getSparderName(), "test-sparder_" + id);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id).getJobId(), id);
+
+ if (id == 1) {
+ //SqlResponse metrics
+ Assert.assertEquals(queryExecutionMetrics.getUser(), "ADMIN");
+ Assert.assertEquals(queryExecutionMetrics.getProject(), "DEFAULT");
+ Assert.assertEquals(queryExecutionMetrics.getQueryType(), "CACHE");
+ Assert.assertEquals(queryExecutionMetrics.getRealization(), "test_cube");
+ Assert.assertEquals(queryExecutionMetrics.getRealizationTypes(), "4");
+ Assert.assertEquals(queryExecutionMetrics.getCuboidIds(), "12345");
+ Assert.assertEquals(queryExecutionMetrics.getTotalScanCount(), 100);
+ Assert.assertEquals(queryExecutionMetrics.getResultCount(), 2);
+ Assert.assertEquals(queryExecutionMetrics.getException(), "NULL");
+
+ Assert.assertEquals(queryExecutionMetrics.getSparderName(), "test-sparder_" + id);
+ Assert.assertEquals(queryExecutionMetrics.getException(), "NULL");
+
+ //SparkExecution metrics
+ Assert.assertEquals(queryExecutionMetrics.getStartTime(), 1610609727972L);
+ Assert.assertEquals(queryExecutionMetrics.getEndTime(), 1610612655793L);
+ Assert.assertTrue(queryExecutionMetrics.getSparkJobMetricsMap().get(id).isSuccess());
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id).getStartTime(), 1610609772880L);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id).getEndTime(), 1610610734401L);
+
+ //SparkStage metrics
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().size(), 1);
+ Assert.assertTrue(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id) != null);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getStageType(), "stageType_" + id);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getStageId(), id);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getResultSize(), 10000);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getExecutorDeserializeCpuTime(), 10);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getExecutorDeserializeTime(), 10);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getExecutorRunTime(), 100);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getExecutorCpuTime(), 10);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getJvmGCTime(), 1);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getResultSerializationTime(), 10);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getMemoryBytesSpilled(), 1000);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getDiskBytesSpilled(), 1000);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getPeakExecutionMemory(), 100);
+ } else if (id == 2) {
+ //SqlResponse metrics
+ Assert.assertEquals(queryExecutionMetrics.getUser(), "ADMIN");
+ Assert.assertEquals(queryExecutionMetrics.getProject(), "DEFAULT");
+ Assert.assertEquals(queryExecutionMetrics.getQueryType(), "PARQUET");
+ Assert.assertEquals(queryExecutionMetrics.getRealization(), "test_cube");
+ Assert.assertEquals(queryExecutionMetrics.getRealizationTypes(), null);
+ Assert.assertEquals(queryExecutionMetrics.getCuboidIds(), null);
+ Assert.assertEquals(queryExecutionMetrics.getTotalScanCount(), 0);
+ Assert.assertEquals(queryExecutionMetrics.getResultCount(), 0);
+ Assert.assertEquals(queryExecutionMetrics.getException(), "NULL");
+
+ //SparkExecution metrics
+ Assert.assertEquals(queryExecutionMetrics.getStartTime(), 1610609876545L);
+ Assert.assertEquals(queryExecutionMetrics.getEndTime(), 1610612685275L);
+ Assert.assertFalse(queryExecutionMetrics.getSparkJobMetricsMap().get(id).isSuccess());
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id).getStartTime(), 1610609901699L);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id).getEndTime(), 1610610750397L);
+
+ //SparkStage metrics
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().size(), 1);
+ Assert.assertTrue(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id) != null);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getStageId(), id);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getResultSize(), 20000);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getExecutorDeserializeCpuTime(), 20);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getExecutorDeserializeTime(), 20);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getExecutorRunTime(), 200);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getExecutorCpuTime(), 20);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getJvmGCTime(), 2);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getResultSerializationTime(), 20);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getMemoryBytesSpilled(), 2000);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getDiskBytesSpilled(), 2000);
+ Assert.assertEquals(queryExecutionMetrics.getSparkJobMetricsMap().get(id)
+ .getSparkStageMetricsMap().get(id).getPeakExecutionMemory(), 200);
+ }
+ }
+
+ private static class QuerySparkMetricsTestRemovalListener implements RemovalListener<String,
+ QuerySparkMetrics.QueryExecutionMetrics> {
+ @Override
+ public void onRemoval(RemovalNotification<String, QuerySparkMetrics.QueryExecutionMetrics> notification) {
+ try {
+ verifyQuerySparkMetrics(notification.getKey(), notification.getValue());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
}
diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
index 2d82e02..f3e7168 100644
--- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
@@ -467,7 +467,7 @@ public class CubeDescCreator {
desc.setDimensions(dimensionDescList);
desc.setMeasures(measureDescList);
desc.setRowkey(rowKeyDesc);
- //desc.setHbaseMapping(hBaseMapping);
+ desc.setHbaseMapping(hBaseMapping);
desc.setNotifyList(Lists.<String> newArrayList());
desc.setStatusNeedNotify(Lists.newArrayList(JobStatusEnum.ERROR.toString()));
desc.setAutoMergeTimeRanges(new long[] { 86400000L, 604800000L, 2419200000L });
diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
index 73e493e..af40392 100644
--- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
@@ -130,7 +130,7 @@ public class HiveTableCreator {
columns.add(new Pair<>(QuerySparkExecutionEnum.SPARDER_NAME.toString(), HiveTypeEnum.HSTRING.toString()));
columns.add(new Pair<>(QuerySparkExecutionEnum.PROJECT.toString(), HiveTypeEnum.HSTRING.toString()));
columns.add(new Pair<>(QuerySparkExecutionEnum.REALIZATION.toString(), HiveTypeEnum.HSTRING.toString()));
- columns.add(new Pair<>(QuerySparkExecutionEnum.REALIZATION_TYPE.toString(), HiveTypeEnum.HINT.toString()));
+ columns.add(new Pair<>(QuerySparkExecutionEnum.REALIZATION_TYPE.toString(), HiveTypeEnum.HSTRING.toString()));
columns.add(new Pair<>(QuerySparkExecutionEnum.CUBOID_IDS.toString(), HiveTypeEnum.HSTRING.toString()));
columns.add(new Pair<>(QuerySparkExecutionEnum.TYPE.toString(), HiveTypeEnum.HSTRING.toString()));