You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/12/23 09:08:43 UTC
[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4846 Set the
related query id to sparder job description
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
new 39086e3 KYLIN-4846 Set the related query id to sparder job description
39086e3 is described below
commit 39086e3a279cd92447c9f919147edec6db058685
Author: Zhichao Zhang <44...@qq.com>
AuthorDate: Tue Dec 22 08:45:47 2020 +0800
KYLIN-4846 Set the related query id to sparder job description
---
.../org/apache/kylin/query/pushdown/SparkSubmitter.java | 6 ++----
.../org/apache/kylin/query/pushdown/SparkSqlClient.scala | 15 ++++++---------
.../org/apache/kylin/query/runtime/plans/ResultPlan.scala | 3 +--
3 files changed, 9 insertions(+), 15 deletions(-)
diff --git a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java b/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
index 2d31822..29259aa 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
+++ b/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
@@ -21,19 +21,17 @@ package org.apache.kylin.query.pushdown;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.engine.spark.metadata.cube.StructField;
import org.apache.spark.sql.SparderContext;
-import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
-import java.util.UUID;
public class SparkSubmitter {
public static final Logger logger = LoggerFactory.getLogger(SparkSubmitter.class);
public static PushdownResponse submitPushDownTask(String sql) {
- SparkSession ss = SparderContext.getSparkSession();
- Pair<List<List<String>>, List<StructField>> pair = SparkSqlClient.executeSql(ss, sql, UUID.randomUUID());
+ Pair<List<List<String>>, List<StructField>> pair =
+ SparkSqlClient.executeSql(SparderContext.getSparkSession(), sql);
SparderContext.closeThreadSparkSession();
return new PushdownResponse(pair.getSecond(), pair.getFirst());
}
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
index 0d8b769..a4064fe 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
@@ -38,22 +38,18 @@ import scala.collection.JavaConverters._
object SparkSqlClient {
val logger: Logger = LoggerFactory.getLogger(classOf[SparkSqlClient])
- def executeSql(ss: SparkSession, sql: String, uuid: UUID): Pair[JList[JList[String]], JList[StructField]] = {
+ def executeSql(ss: SparkSession, sql: String): Pair[JList[JList[String]], JList[StructField]] = {
ss.sparkContext.setLocalProperty("spark.scheduler.pool", "query_pushdown")
HadoopUtil.setCurrentConfiguration(ss.sparkContext.hadoopConfiguration)
- val s = "Start to run sql with SparkSQL..."
val queryId = QueryContextFacade.current().getQueryId
ss.sparkContext.setLocalProperty(QueryToExecutionIDCache.KYLIN_QUERY_ID_KEY, queryId)
- logger.info(s)
+ logger.info("Start to run sql with SparkSQL...")
val df = ss.sql(sql)
autoSetShufflePartitions(ss, df)
- val msg = "SparkSQL returned result DataFrame"
- logger.info(msg)
-
- DFToList(ss, sql, uuid, df)
+ DFToList(ss, sql, df)
}
private def autoSetShufflePartitions(ss: SparkSession, df: DataFrame) = {
@@ -74,9 +70,10 @@ object SparkSqlClient {
}
}
- private def DFToList(ss: SparkSession, sql: String, uuid: UUID, df: DataFrame): Pair[JList[JList[String]], JList[StructField]] = {
+ private def DFToList(ss: SparkSession, sql: String, df: DataFrame): Pair[JList[JList[String]], JList[StructField]] = {
val jobGroup = Thread.currentThread.getName
- ss.sparkContext.setJobGroup(jobGroup, s"Push down: $sql", interruptOnCancel = true)
+ ss.sparkContext.setJobGroup(jobGroup,
+ "Pushdown Query Id: " + QueryContextFacade.current().getQueryId, interruptOnCancel = true)
try {
val temporarySchema = df.schema.fields.zipWithIndex.map {
case (_, index) => s"temporary_$index"
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
index d840207..991a7f2 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
@@ -102,8 +102,7 @@ object ResultPlan extends Logging {
QueryContextFacade.current().setDataset(df)
sparkContext.setJobGroup(jobGroup,
- // QueryContextFacade.current().getSql,
- "sparder",
+ "Query Id: " + QueryContextFacade.current().getQueryId,
interruptOnCancel = true)
try {
val rows = df.collect()