You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/10/08 07:51:12 UTC

[kylin] branch main updated: KYLIN-5271 Query memory leaks

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c3ad9f42a KYLIN-5271 Query memory leaks
0c3ad9f42a is described below

commit 0c3ad9f42a81a15bf115a8d2cadfe58deba116a4
Author: zhaoliu4 <zh...@iflytek.com>
AuthorDate: Fri Sep 23 00:05:53 2022 +0800

    KYLIN-5271 Query memory leaks
---
 .../kylin/query/pushdown/SparkSubmitter.java       |  9 +++++---
 .../apache/kylin/query/runtime/SparkEngine.java    | 26 ++++++++++++++--------
 .../kylin/query/runtime/plans/ResultPlan.scala     |  1 -
 .../org/apache/spark/sql/SparderContext.scala      |  1 -
 .../apache/spark/sql/SparderContextFacade.scala    |  1 +
 5 files changed, 24 insertions(+), 14 deletions(-)

diff --git a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java b/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
index 29259aa4bd..0379a1727c 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
+++ b/kylin-spark-project/kylin-spark-query/src/main/java/org/apache/kylin/query/pushdown/SparkSubmitter.java
@@ -30,9 +30,12 @@ public class SparkSubmitter {
     public static final Logger logger = LoggerFactory.getLogger(SparkSubmitter.class);
 
     public static PushdownResponse submitPushDownTask(String sql) {
-        Pair<List<List<String>>, List<StructField>> pair =
-                SparkSqlClient.executeSql(SparderContext.getSparkSession(), sql);
-        SparderContext.closeThreadSparkSession();
+        Pair<List<List<String>>, List<StructField>> pair = null;
+        try {
+            pair = SparkSqlClient.executeSql(SparderContext.getSparkSession(), sql);
+        } finally {
+            SparderContext.closeThreadSparkSession();
+        }
         return new PushdownResponse(pair.getSecond(), pair.getFirst());
     }
 
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
index 3a504ae073..05d2888e94 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java
@@ -29,6 +29,7 @@ import org.apache.kylin.query.runtime.plans.ResultPlan;
 import org.apache.kylin.query.runtime.plans.ResultType;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparderContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,21 +39,28 @@ public class SparkEngine implements QueryEngine {
 
     @Override
     public Enumerable<Object> computeSCALA(DataContext dataContext, RelNode relNode, RelDataType resultType) {
-        Dataset<Row> sparkPlan = toSparkPlan(dataContext, relNode);
-        if (System.getProperty("calcite.debug") != null) {
-            log.debug("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution());
+        try {
+            Dataset<Row> sparkPlan = toSparkPlan(dataContext, relNode);
+            if (System.getProperty("calcite.debug") != null) {
+                log.debug("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution());
+            }
+            return ResultPlan.getResult(sparkPlan, resultType, ResultType.SCALA()).right().get();
+        } finally {
+            SparderContext.closeThreadSparkSession();
         }
-        return ResultPlan.getResult(sparkPlan, resultType, ResultType.SCALA()).right().get();
-
     }
 
     @Override
     public Enumerable<Object[]> compute(DataContext dataContext, RelNode relNode, RelDataType resultType) {
-        Dataset<Row> sparkPlan = toSparkPlan(dataContext, relNode);
-        if (System.getProperty("calcite.debug") != null) {
-            log.info("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution());
+        try {
+            Dataset<Row> sparkPlan = toSparkPlan(dataContext, relNode);
+            if (System.getProperty("calcite.debug") != null) {
+                log.info("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution());
+            }
+            return ResultPlan.getResult(sparkPlan, resultType, ResultType.NORMAL()).left().get();
+        } finally {
+            SparderContext.closeThreadSparkSession();
         }
-        return ResultPlan.getResult(sparkPlan, resultType, ResultType.NORMAL()).left().get();
     }
 
     private Dataset<Row> toSparkPlan(DataContext dataContext, RelNode relNode) {
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
index 7c95636e31..3f98f47f39 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ResultPlan.scala
@@ -190,7 +190,6 @@ object ResultPlan extends Logging {
           }
       }
     SparderContext.cleanQueryInfo()
-    SparderContext.closeThreadSparkSession()
     result
   }
 }
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
index e6b73d66cc..6be695112d 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
@@ -61,7 +61,6 @@ object SparderContext extends Logging {
   }
 
   def getSparkSession: SparkSession = {
-    logInfo(s"Current thread ${Thread.currentThread().getId} create a SparkSession.")
     SparderContextFacade.current().getFirst
   }
 
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContextFacade.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContextFacade.scala
index 386e9fcfc0..ede8607607 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContextFacade.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContextFacade.scala
@@ -32,6 +32,7 @@ object SparderContextFacade extends Logging {
   def current(): Pair[SparkSession, UdfManager] = {
     if (CURRENT_SPARKSESSION.get() == null) {
       val spark = SparderContext.getOriginalSparkSession.cloneSession()
+      logInfo(s"Current thread ${Thread.currentThread().getId} create a SparkSession.")
       CURRENT_SPARKSESSION.set(new Pair[SparkSession, UdfManager](spark,
         UdfManager.createWithoutBuildInFunc(spark)))
     }