You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kylin.apache.org by "Xiaoxiang Yu (Jira)" <ji...@apache.org> on 2022/10/08 07:04:00 UTC

[jira] [Updated] (KYLIN-5271) Query memory leaks

     [ https://issues.apache.org/jira/browse/KYLIN-5271?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Xiaoxiang Yu updated KYLIN-5271:
--------------------------------
    Fix Version/s: v4.1.0

> Query memory leaks
> ------------------
>
>                 Key: KYLIN-5271
>                 URL: https://issues.apache.org/jira/browse/KYLIN-5271
>             Project: Kylin
>          Issue Type: Bug
>          Components: Query Engine
>    Affects Versions: v4.0.1
>            Reporter: Liu Zhao
>            Priority: Major
>             Fix For: v4.1.0
>
>
> The query thread will clone a SparkSession and put it into ThreadLocal. However, if an exception occurs in the Calcite To SparkPlan, the SparkSession in ThreadLocal will not be removed. More importantly, if the Spark restarts later, the SparkSession left in ThreadLocal will be unavailable, and the query on this thread will fail, throwing an exception: Caused by: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext
> This stopped SparkContext was created at:
> org.apache.spark.sql.SparderContext$$anon$4.run(SparderContext.scala:150)
> java.lang.Thread.run(Thread.java:748)
> // put SparkSession toThreadLocal
> {code:java}
> object SparderContextFacade extends Logging {
>   final val CURRENT_SPARKSESSION: InternalThreadLocal[Pair[SparkSession, UdfManager]] =
>     new InternalThreadLocal[Pair[SparkSession, UdfManager]]()
>   def current(): Pair[SparkSession, UdfManager] = {
>     if (CURRENT_SPARKSESSION.get() == null) {
>       val spark = SparderContext.getOriginalSparkSession.cloneSession()
>       CURRENT_SPARKSESSION.set(new Pair[SparkSession, UdfManager](spark,
>         UdfManager.createWithoutBuildInFunc(spark)))
>     }
>     CURRENT_SPARKSESSION.get()
>   }
>   def remove(): Unit = {
>     CURRENT_SPARKSESSION.remove()
>   }
> }
> {code}
> // remove SparkSession from ThreadLocal
> // org.apache.kylin.query.runtime.plans.ResultPlan
> {code:java}
>     def getResult(df: DataFrame, rowType: RelDataType, resultType: ResultType)
>   : Either[Enumerable[Array[Any]], Enumerable[Any]] = withScope(df) {
>     val result: Either[Enumerable[Array[Any]], Enumerable[Any]] =
>       resultType match {
>         case ResultType.NORMAL =>
>           if (SparderContext.needCompute()) {
>             Left(ResultPlan.collectEnumerable(df, rowType))
>           } else {
>             Left(Linq4j.asEnumerable(Array.empty[Array[Any]]))
>           }
>         case ResultType.SCALA =>
>           if (SparderContext.needCompute()) {
>             Right(ResultPlan.collectScalarEnumerable(df, rowType))
>           } else {
>             Right(Linq4j.asEnumerable(Lists.newArrayList[Any]()))
>           }
>       }
>     SparderContext.cleanQueryInfo()
>     SparderContext.closeThreadSparkSession()
>     result
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)