You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/11/04 08:15:52 UTC
[kylin] branch main updated: KYLIN-5111 Record the time spent for
each stage of query in kylin4's log
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push:
new d9a5c70 KYLIN-5111 Record the time spent for each stage of query in kylin4's log
d9a5c70 is described below
commit d9a5c70136c29b77f981058c7166492ce53ea2a4
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Mon Nov 1 10:55:37 2021 +0800
KYLIN-5111 Record the time spent for each stage of query in kylin4's log
---
.../java/org/apache/kylin/common/QueryContext.java | 10 +-
.../java/org/apache/kylin/common/QueryTrace.java | 141 +++++++++++++++++++++
.../org/apache/kylin/common/QueryTraceTest.java | 47 +++++++
.../apache/kylin/query/runtime/SparkEngine.java | 3 +
.../kylin/query/runtime/plans/ResultPlan.scala | 5 +
.../apache/kylin/query/util/SparkJobTrace.scala | 118 +++++++++++++++++
.../org/apache/spark/sql/metrics/AppStatus.scala | 101 +++++++++++++++
.../query/relnode/OLAPToEnumerableConverter.java | 3 +
.../apache/kylin/rest/response/SQLResponse.java | 10 ++
.../kylin/rest/response/SQLResponseTrace.java | 61 +++++++++
.../apache/kylin/rest/service/QueryService.java | 16 ++-
.../kylin/rest/response/SQLResponseTest.java | 2 +-
12 files changed, 513 insertions(+), 4 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
index 53f67f9..69e2994 100644
--- a/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryContext.java
@@ -69,12 +69,12 @@ public class QueryContext {
private boolean isHighPriorityQuery = false;
private boolean isTableIndex = false;
private boolean withoutSyntaxError;
+ private QueryTrace queryTrace = new QueryTrace();
private AtomicBoolean isRunning = new AtomicBoolean(true);
private AtomicReference<Throwable> throwable = new AtomicReference<>();
private String stopReason;
private List<QueryStopListener> stopListeners = Lists.newCopyOnWriteArrayList();
-
private List<RPCStatistics> rpcStatisticsList = Lists.newCopyOnWriteArrayList();
private Map<Integer, CubeSegmentStatisticsResult> cubeSegmentStatisticsResultMap = Maps.newConcurrentMap();
@@ -205,6 +205,14 @@ public class QueryContext {
return metadataTime.addAndGet(time);
}
+ public QueryTrace getQueryTrace() {
+ return queryTrace;
+ }
+
+ public void setQueryTrace(QueryTrace queryTrace) {
+ this.queryTrace = queryTrace;
+ }
+
//Scaned time with Spark
public long getScanTime() {
return scanTime.get();
diff --git a/core-common/src/main/java/org/apache/kylin/common/QueryTrace.java b/core-common/src/main/java/org/apache/kylin/common/QueryTrace.java
new file mode 100644
index 0000000..4387e0a
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/QueryTrace.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+public class QueryTrace {
+
+ // span name
+ public static final String SQL_TRANSFORMATION = "SQL_TRANSFORMATION";
+ public static final String SQL_PARSE_AND_OPTIMIZE = "SQL_PARSE_AND_OPTIMIZE";
+ public static final String CUBE_MATCHING = "CUBE_MATCHING";
+ public static final String PREPARE_AND_SUBMIT_JOB = "PREPARE_AND_SUBMIT_JOB";
+ public static final String WAIT_FOR_EXECUTION = "WAIT_FOR_EXECUTION";
+ public static final String EXECUTION = "EXECUTION";
+ public static final String FETCH_RESULT = "FETCH_RESULT";
+
+ // group name
+ static final String PREPARATION = "PREPARATION";
+ static final Map<String, String> SPAN_GROUPS = new HashMap<>();
+ static {
+ SPAN_GROUPS.put(SQL_TRANSFORMATION, PREPARATION);
+ SPAN_GROUPS.put(SQL_PARSE_AND_OPTIMIZE, PREPARATION);
+ SPAN_GROUPS.put(CUBE_MATCHING, PREPARATION);
+ }
+
+
+ private List<Span> spans = new LinkedList<>();
+
+ public Optional<Span> getLastSpan() {
+ return spans.isEmpty() ? Optional.empty() : Optional.of(spans.get(spans.size() - 1));
+ }
+
+ public void endLastSpan() {
+ getLastSpan().ifPresent(span -> {
+ if (span.duration == -1) {
+ span.duration = System.currentTimeMillis() - span.start;
+ }
+ });
+ }
+
+ public void startSpan(String name) {
+ endLastSpan();
+ spans.add(new Span(name, System.currentTimeMillis()));
+ }
+
+ public void appendSpan(String name, long duration) {
+ spans.add(new Span(name,
+ getLastSpan().map(span -> span.getStart() + span.getDuration()).orElse(System.currentTimeMillis()),
+ duration));
+ }
+
+ public void amendLast(String name, long endAt) {
+ for (int i = spans.size() - 1; i >= 0; i--) {
+ if (spans.get(i).name.equals(name)) {
+ spans.get(i).duration = endAt - spans.get(i).start;
+ return;
+ }
+ }
+ }
+
+ public List<Span> spans() {
+ return spans;
+ }
+
+ public static class Span {
+ String name;
+
+ String group;
+
+ long start;
+
+ long duration = -1;
+
+ public long getDuration() {
+ return duration;
+ }
+
+ public long getStart() {
+ return start;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setDuration(long duration) {
+ this.duration = duration;
+ }
+
+ public void setGroup(String group) {
+ this.group = group;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setStart(long start) {
+ this.start = start;
+ }
+
+ public Span(String name, long start, long duration) {
+ this.name = name;
+ this.start = start;
+ this.duration = duration;
+ this.group = SPAN_GROUPS.get(name);
+ }
+
+ public Span(String name, long start) {
+ this.name = name;
+ this.start = start;
+ this.group = SPAN_GROUPS.get(name);
+ }
+ }
+}
+
diff --git a/core-common/src/test/java/org/apache/kylin/common/QueryTraceTest.java b/core-common/src/test/java/org/apache/kylin/common/QueryTraceTest.java
new file mode 100644
index 0000000..5261b73
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/QueryTraceTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class QueryTraceTest {
+
+ @Test
+ public void test() throws InterruptedException {
+ QueryTrace trace = new QueryTrace();
+ trace.startSpan("span 1");
+ Thread.sleep(100);
+ trace.startSpan("span 2");
+ Thread.sleep(100);
+ trace.endLastSpan();
+ Assert.assertEquals(2, trace.spans().size());
+ Assert.assertTrue(trace.getLastSpan().isPresent());
+ Assert.assertEquals("span 2", trace.getLastSpan().get().name);
+ assertTimeEqual(100, trace.getLastSpan().get().duration);
+
+ trace.amendLast("span 2", trace.getLastSpan().get().start + trace.getLastSpan().get().getDuration() + 1000);
+ assertTimeEqual(1100, trace.getLastSpan().get().duration);
+ }
+
+ private void assertTimeEqual(long expected, long actual) {
+ Assert.assertTrue(Math.abs(expected - actual) < 1000);
+ }
+
+}
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
index c2b0cc4..3a504ae 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
@@ -22,6 +22,8 @@ import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.kylin.common.QueryContextFacade;
+import org.apache.kylin.common.QueryTrace;
import org.apache.kylin.query.exec.QueryEngine;
import org.apache.kylin.query.runtime.plans.ResultPlan;
import org.apache.kylin.query.runtime.plans.ResultType;
@@ -55,6 +57,7 @@ public class SparkEngine implements QueryEngine {
private Dataset<Row> toSparkPlan(DataContext dataContext, RelNode relNode) {
log.trace("Begin planning spark plan.");
+ QueryContextFacade.current().getQueryTrace().startSpan(QueryTrace.PREPARE_AND_SUBMIT_JOB);
long start = System.currentTimeMillis();
CalciteToSparkPlaner calciteToSparkPlaner = new CalciteToSparkPlaner(dataContext);
calciteToSparkPlaner.go(relNode);
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
index 991a7f2..c0558b5 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
@@ -27,6 +27,7 @@ import org.apache.kylin.common.{KylinConfig, QueryContext, QueryContextFacade}
import org.apache.kylin.common.util.HadoopUtil
import org.apache.kylin.metadata.project.ProjectManager
import org.apache.kylin.query.runtime.plans.ResultType.ResultType
+import org.apache.kylin.query.util.SparkJobTrace
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparderContext}
import org.apache.spark.sql.hive.utils.QueryMetricUtils
@@ -104,8 +105,12 @@ object ResultPlan extends Logging {
sparkContext.setJobGroup(jobGroup,
"Query Id: " + QueryContextFacade.current().getQueryId,
interruptOnCancel = true)
+ val currentTrace = QueryContextFacade.current().getQueryTrace
+ currentTrace.endLastSpan()
+ val jobTrace = new SparkJobTrace(jobGroup, currentTrace, sparkContext)
try {
val rows = df.collect()
+ jobTrace.jobFinished()
val (scanRows, scanFiles, metadataTime, scanTime, scanBytes) = QueryMetricUtils.collectScanMetrics(df.queryExecution.executedPlan)
QueryContextFacade.current().addAndGetScannedRows(scanRows.asScala.map(Long2long(_)).sum)
QueryContextFacade.current().addAndGetScanFiles(scanFiles.asScala.map(Long2long(_)).sum)
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/util/SparkJobTrace.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/util/SparkJobTrace.scala
new file mode 100644
index 0000000..e6de31a
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/util/SparkJobTrace.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.util
+
+import org.apache.kylin.common.QueryTrace
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.metrics.AppStatus
+import org.apache.spark.utils.LogEx
+
+/**
+ * helper class for tracing the spark job execution time during query
+ */
+class SparkJobTrace(jobGroup: String,
+ queryTrace: QueryTrace,
+ sparkContext: SparkContext,
+ startAt: Long = System.currentTimeMillis()) extends LogEx {
+
+ val appStatus = new AppStatus(sparkContext)
+
+ /**
+ * called right after job execution is done and the helper will calculate and estimate
+ * durations for job execution steps (WAIT_FOR_EXECUTION, EXECUTION, FETCH_RESULT)
+ *
+ * As stages and tasks are executed in parallel, it is hard to have a precise duration
+ * trace for each step
+ * In this helper, we estimate the duration of WAIT_FOR_EXECUTION, EXECUTION, FETCH_RESULT
+ * as follows
+ * 1. Calculate the mean task launch delay, task execution duration and task fetch result time.
+ * the launch delay = task launch time - stage submission time
+ * 2. Sum the mean task launch delay, task execution duration and task fetch result time
+ * from all stages. And calculate the proportion of each part
+ * 3. Calculate the duration of each step by multiple the corresponding proportion and the
+ * total job execution duration
+ *
+ * We use the mean of task launch delay as it can give a rough estimation on how much time
+ * the tasks in a stage are spending on waiting for a free executor. And If the delay is
+ * Long, it may imply the executor-core config is not insufficient for the number of tasks,
+ * or the cluster is in heavy work load
+ */
+ def jobFinished(): Unit = {
+ try {
+ val jobDataSeq = appStatus.getJobData(jobGroup)
+
+ if (jobDataSeq.isEmpty) {
+ endAbnormalExecutionTrace()
+ return
+ }
+
+ var jobExecutionTime = System.currentTimeMillis() - startAt
+ val submissionTime = jobDataSeq.map(_.submissionTime).min
+ if (submissionTime.isDefined) {
+ queryTrace.amendLast(QueryTrace.PREPARE_AND_SUBMIT_JOB, submissionTime.get.getTime)
+ }
+ val completionTime = jobDataSeq.map(_.completionTime).max
+ if (submissionTime.isDefined && completionTime.isDefined) {
+ jobExecutionTime = completionTime.get.getTime - submissionTime.get.getTime
+ }
+
+ val jobMetrics = jobDataSeq.map(_.jobId)
+ .flatMap(appStatus.getJobStagesSummary(_, 0.5))
+ .foldLeft((0.0, 0.0)) { (acc, taskMetrics) =>
+ (
+ acc._1 + taskMetrics.executorRunTime.head + taskMetrics.executorDeserializeTime.head,
+ acc._2 + taskMetrics.gettingResultTime.head
+ )
+ }
+ val launchDelayTimeSum = jobDataSeq.flatMap(_.stageIds).flatMap(appStatus.getStage).map { stage =>
+ appStatus.getTaskLaunchTime(stage.stageId(), 0.5) - stage.submissionTime()
+ }.sum
+ val sum = jobMetrics._1 + jobMetrics._2 + launchDelayTimeSum
+ val computingTime = jobMetrics._1 * jobExecutionTime / sum
+ val getResultTime = jobMetrics._2 * jobExecutionTime / sum
+ val launchDelayTime = launchDelayTimeSum * jobExecutionTime / sum
+
+ queryTrace.appendSpan(QueryTrace.WAIT_FOR_EXECUTION, launchDelayTime.longValue());
+ queryTrace.appendSpan(QueryTrace.EXECUTION, computingTime.longValue());
+ queryTrace.appendSpan(QueryTrace.FETCH_RESULT, getResultTime.longValue());
+ } catch {
+ case e =>
+ logWarning(s"Failed trace spark job execution for $jobGroup", e)
+ endAbnormalExecutionTrace()
+ }
+ }
+
+ /**
+ * called right after result transformation is done to count the
+ * transformation time to total result fetch duration
+ */
+ def resultConverted(): Unit = {
+ queryTrace.amendLast(QueryTrace.FETCH_RESULT, System.currentTimeMillis())
+ }
+
+ /**
+ * add dummy spans for abnormal trace anyway
+ */
+ def endAbnormalExecutionTrace(): Unit = {
+ queryTrace.appendSpan(QueryTrace.WAIT_FOR_EXECUTION, 0);
+ queryTrace.appendSpan(QueryTrace.EXECUTION, System.currentTimeMillis() - startAt);
+ queryTrace.appendSpan(QueryTrace.FETCH_RESULT, 0);
+ }
+}
+
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/AppStatus.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/AppStatus.scala
new file mode 100644
index 0000000..e1eb6f8
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/metrics/AppStatus.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.metrics
+
+import org.apache.spark.{SparkContext, SparkStageInfo}
+import org.apache.spark.status.{TaskDataWrapper, TaskIndexNames}
+import org.apache.spark.util.Utils
+import org.apache.spark.status.api.v1
+
+class AppStatus(sparkContext: SparkContext) {
+
+ def getTaskLaunchTime(stageId: Int, quantile: Double): Double = {
+ scanTasks(stageId, TaskIndexNames.LAUNCH_TIME, quantile) { t => t.launchTime }
+ }
+
+ // copied from org.apache.spark.status.AppStatusStore.taskSummary
+ def scanTasks(stageId: Int, index: String, quantile: Double)(fn: TaskDataWrapper => Long): Double = {
+ val stageKey = Array(stageId, 0)
+ val count = {
+ Utils.tryWithResource(
+ sparkContext.statusStore.store.view(classOf[TaskDataWrapper])
+ .parent(stageKey)
+ .index(TaskIndexNames.EXEC_RUN_TIME)
+ .first(0L)
+ .closeableIterator()
+ ) { it =>
+ var _count = 0L
+ while (it.hasNext()) {
+ _count += 1
+ it.skip(1)
+ }
+ _count
+ }
+ }
+
+ val idx = math.min((quantile * count).toLong, count - 1)
+ Utils.tryWithResource(
+ sparkContext.statusStore.store.view(classOf[TaskDataWrapper])
+ .parent(stageKey)
+ .index(index)
+ .first(0L)
+ .closeableIterator()
+ ) { it =>
+ var last = Double.NaN
+ var currentIdx = -1L
+ if (idx == currentIdx) {
+ last
+ } else {
+ val diff = idx - currentIdx
+ currentIdx = idx
+ if (it.skip(diff - 1)) {
+ last = fn(it.next()).toDouble
+ last
+ } else {
+ Double.NaN
+ }
+ }
+ }
+ }
+
+ def getJobStagesSummary(jobId: Int, quantile: Double): Seq[v1.TaskMetricDistributions] = {
+ getJobData(jobId).map { jobData =>
+ jobData.stageIds.flatMap { stageId =>
+ sparkContext.statusStore.taskSummary(stageId, 0, Array(quantile))
+ }
+ }.getOrElse(Seq.empty)
+ }
+
+ def getStage(stageId: Int): Option[SparkStageInfo] = {
+ sparkContext.statusTracker.getStageInfo(stageId)
+ }
+
+ def getJobData(jobGroup: String): Seq[v1.JobData] = {
+ sparkContext.statusTracker.getJobIdsForGroup(jobGroup).map(getJobData).filter(_.isDefined).map(_.get)
+ }
+
+ def getJobData(jobId: Int): Option[v1.JobData] = {
+ try {
+ Some(sparkContext.statusStore.job(jobId))
+ } catch {
+ case _: NoSuchElementException => None
+ }
+ }
+}
+
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java
index 20e5f21..5a4094f 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPToEnumerableConverter.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.stream.Collectors;
import com.google.common.collect.Lists;
+import org.apache.kylin.common.QueryTrace;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -107,6 +108,8 @@ public class OLAPToEnumerableConverter extends ConverterImpl implements Enumerab
logger.debug(dumpPlan);
}
+ QueryContextFacade.current().getQueryTrace().startSpan(QueryTrace.CUBE_MATCHING);
+
RealizationChooser.selectRealization(contexts);
QueryInfoCollector.current().setCubeNames(contexts.stream()
diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
index 37578ec..fcfc8ad 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponse.java
@@ -95,6 +95,8 @@ public class SQLResponse implements Serializable {
// indicating the lazy query start time, -1 indicating not enabled
protected long lazyQueryStartTime = -1L;
+ private List<SQLResponseTrace> traces;
+
public SQLResponse() {
}
@@ -296,6 +298,14 @@ public class SQLResponse implements Serializable {
this.realizationTypes = realizationTypes;
}
+ public void setTraces(List<SQLResponseTrace> traces) {
+ this.traces = traces;
+ }
+
+ public List<SQLResponseTrace> getTraces() {
+ return traces;
+ }
+
@JsonIgnore
public List<QueryContext.CubeSegmentStatisticsResult> getCubeSegmentStatisticsList() {
try {
diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponseTrace.java b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponseTrace.java
new file mode 100644
index 0000000..3145f69
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/response/SQLResponseTrace.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.response;
+
+public class SQLResponseTrace {
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setGroup(String group) {
+ this.group = group;
+ }
+
+ public void setDuration(long duration) {
+ this.duration = duration;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public long getDuration() {
+ return duration;
+ }
+
+ private String name;
+
+ private String group;
+
+ private long duration;
+
+ public SQLResponseTrace() {
+ }
+
+ public SQLResponseTrace(String name, String group, long duration) {
+ this.name = name;
+ this.group = group;
+ this.duration = duration;
+ }
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index d1fc461..fcf3be2 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@ -43,6 +43,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
@@ -66,6 +67,7 @@ import org.apache.kylin.cache.cachemanager.MemcachedCacheManager;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryContext;
import org.apache.kylin.common.QueryContextFacade;
+import org.apache.kylin.common.QueryTrace;
import org.apache.kylin.metrics.QuerySparkMetrics;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.exceptions.ResourceLimitExceededException;
@@ -113,6 +115,7 @@ import org.apache.kylin.rest.msg.MsgPicker;
import org.apache.kylin.rest.request.PrepareSqlRequest;
import org.apache.kylin.rest.request.SQLRequest;
import org.apache.kylin.rest.response.SQLResponse;
+import org.apache.kylin.rest.response.SQLResponseTrace;
import org.apache.kylin.rest.util.AclEvaluate;
import org.apache.kylin.rest.util.AclPermissionUtil;
import org.apache.kylin.rest.util.QueryRequestLimits;
@@ -219,6 +222,9 @@ public class QueryService extends BasicService {
badQueryDetector.queryStart(Thread.currentThread(), sqlRequest, user, queryId);
ret = queryWithSqlMassage(sqlRequest);
+ ret.setTraces(QueryContextFacade.current().getQueryTrace().spans().stream()
+ .map(span -> new SQLResponseTrace(span.getName(), span.getGroup(), span.getDuration()))
+ .collect(Collectors.toList()));
return ret;
} finally {
@@ -386,6 +392,11 @@ public class QueryService extends BasicService {
stringBuilder.append("Used Spark pool: ").append(response.getSparkPool()).append(newLine);
stringBuilder.append("Trace URL: ").append(response.getTraceUrl()).append(newLine);
stringBuilder.append("Message: ").append(response.getExceptionMessage()).append(newLine);
+ if (response.getTraces() != null) {
+ stringBuilder.append("Time consuming for each query stage: -----------------").append(newLine);
+ response.getTraces().forEach(trace -> stringBuilder.append(trace.getName() + " : " + trace.getDuration() + "ms").append(newLine));
+ stringBuilder.append("Time consuming for each query stage: -----------------").append(newLine);
+ }
stringBuilder.append("==========================[QUERY]===============================").append(newLine);
logger.info(stringBuilder.toString());
@@ -408,8 +419,10 @@ public class QueryService extends BasicService {
public SQLResponse doQueryWithCache(SQLRequest sqlRequest, boolean isQueryInspect) {
Message msg = MsgPicker.getMsg();
sqlRequest.setUsername(getUserName());
+ final QueryContext queryContext = QueryContextFacade.current();
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+ queryContext.getQueryTrace().startSpan(QueryTrace.SQL_TRANSFORMATION);
if (!ServerMode.SERVER_MODE.canServeQuery()) {
throw new BadRequestException(
String.format(Locale.ROOT, msg.getQUERY_NOT_ALLOWED(), kylinConfig.getServerMode()));
@@ -430,8 +443,6 @@ public class QueryService extends BasicService {
if (sqlRequest.getBackdoorToggles() != null)
BackdoorToggles.addToggles(sqlRequest.getBackdoorToggles());
- final QueryContext queryContext = QueryContextFacade.current();
-
try (SetThreadName ignored = new SetThreadName("Query %s", queryContext.getQueryId())) {
// force clear the query context before a new query
OLAPContext.clearThreadLocalContexts();
@@ -1023,6 +1034,7 @@ public class QueryService extends BasicService {
try {
stat = conn.createStatement();
processStatementAttr(stat, sqlRequest);
+ QueryContextFacade.current().getQueryTrace().startSpan(QueryTrace.SQL_PARSE_AND_OPTIMIZE);
resultSet = stat.executeQuery(correctedSql);
r = createResponseFromResultSet(resultSet);
diff --git a/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java b/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
index 370b1ea..deeafc9 100644
--- a/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
+++ b/server-base/src/test/java/org/apache/kylin/rest/response/SQLResponseTest.java
@@ -36,7 +36,7 @@ public class SQLResponseTest {
"realizationTypes", "affectedRowCount", "isException",
"exceptionMessage", "duration", "partial", "totalScanCount", "hitExceptionCache",
"storageCacheUsed", "sparkPool", "pushDown", "traceUrl", "totalScanBytes",
- "totalScanFiles", "metadataTime", "totalSparkScanTime" };
+ "totalScanFiles", "metadataTime", "totalSparkScanTime", "traces"};
SQLResponse sqlResponse = new SQLResponse(null, null, "learn_cube", 100, false, null, false, false);
String jsonStr = JsonUtil.writeValueAsString(sqlResponse);