You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/12/23 09:08:43 UTC

[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4846 Set the related query id to sparder job description

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
     new 39086e3  KYLIN-4846 Set the related query id to sparder job description
39086e3 is described below

commit 39086e3a279cd92447c9f919147edec6db058685
Author: Zhichao Zhang <44...@qq.com>
AuthorDate: Tue Dec 22 08:45:47 2020 +0800

    KYLIN-4846 Set the related query id to sparder job description
---
 .../org/apache/kylin/query/pushdown/SparkSubmitter.java   |  6 ++----
 .../org/apache/kylin/query/pushdown/SparkSqlClient.scala  | 15 ++++++---------
 .../org/apache/kylin/query/runtime/plans/ResultPlan.scala |  3 +--
 3 files changed, 9 insertions(+), 15 deletions(-)

diff --git a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java b/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
index 2d31822..29259aa 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
+++ b/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
@@ -21,19 +21,17 @@ package org.apache.kylin.query.pushdown;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.engine.spark.metadata.cube.StructField;
 import org.apache.spark.sql.SparderContext;
-import org.apache.spark.sql.SparkSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
-import java.util.UUID;
 
 public class SparkSubmitter {
     public static final Logger logger = LoggerFactory.getLogger(SparkSubmitter.class);
 
     public static PushdownResponse submitPushDownTask(String sql) {
-        SparkSession ss = SparderContext.getSparkSession();
-        Pair<List<List<String>>, List<StructField>> pair = SparkSqlClient.executeSql(ss, sql, UUID.randomUUID());
+        Pair<List<List<String>>, List<StructField>> pair =
+                SparkSqlClient.executeSql(SparderContext.getSparkSession(), sql);
         SparderContext.closeThreadSparkSession();
         return new PushdownResponse(pair.getSecond(), pair.getFirst());
     }
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
index 0d8b769..a4064fe 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/pushdown/SparkSqlClient.scala
@@ -38,22 +38,18 @@ import scala.collection.JavaConverters._
 object SparkSqlClient {
 	val logger: Logger = LoggerFactory.getLogger(classOf[SparkSqlClient])
 
-	def executeSql(ss: SparkSession, sql: String, uuid: UUID): Pair[JList[JList[String]], JList[StructField]] = {
+	def executeSql(ss: SparkSession, sql: String): Pair[JList[JList[String]], JList[StructField]] = {
 		ss.sparkContext.setLocalProperty("spark.scheduler.pool", "query_pushdown")
 		HadoopUtil.setCurrentConfiguration(ss.sparkContext.hadoopConfiguration)
-		val s = "Start to run sql with SparkSQL..."
 		val queryId = QueryContextFacade.current().getQueryId
 		ss.sparkContext.setLocalProperty(QueryToExecutionIDCache.KYLIN_QUERY_ID_KEY, queryId)
-		logger.info(s)
+		logger.info("Start to run sql with SparkSQL...")
 
 		val df = ss.sql(sql)
 
 		autoSetShufflePartitions(ss, df)
 
-		val msg = "SparkSQL returned result DataFrame"
-		logger.info(msg)
-
-		DFToList(ss, sql, uuid, df)
+		DFToList(ss, sql, df)
 	}
 
 	private def autoSetShufflePartitions(ss: SparkSession, df: DataFrame) = {
@@ -74,9 +70,10 @@ object SparkSqlClient {
 		}
 	}
 
-	private def DFToList(ss: SparkSession, sql: String, uuid: UUID, df: DataFrame): Pair[JList[JList[String]], JList[StructField]] = {
+	private def DFToList(ss: SparkSession, sql: String, df: DataFrame): Pair[JList[JList[String]], JList[StructField]] = {
 		val jobGroup = Thread.currentThread.getName
-		ss.sparkContext.setJobGroup(jobGroup, s"Push down: $sql", interruptOnCancel = true)
+		ss.sparkContext.setJobGroup(jobGroup,
+			"Pushdown Query Id: " + QueryContextFacade.current().getQueryId, interruptOnCancel = true)
 		try {
 			val temporarySchema = df.schema.fields.zipWithIndex.map {
 				case (_, index) => s"temporary_$index"
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
index d840207..991a7f2 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
@@ -102,8 +102,7 @@ object ResultPlan extends Logging {
     QueryContextFacade.current().setDataset(df)
 
     sparkContext.setJobGroup(jobGroup,
-      //      QueryContextFacade.current().getSql,
-      "sparder",
+      "Query Id: " + QueryContextFacade.current().getQueryId,
       interruptOnCancel = true)
     try {
       val rows = df.collect()