You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/12/19 05:10:57 UTC

spark git commit: [SPARK-4756][SQL] FIX: sessionToActivePool grow infinitely, even as sessions expire

Repository: spark
Updated Branches:
  refs/heads/master b68bc6d26 -> 22ddb6e03


[SPARK-4756][SQL] FIX: sessionToActivePool grow infinitely, even as sessions expire

**sessionToActivePool** in **SparkSQLOperationManager** grow infinitely, even as sessions expire.
we should remove the pool value when the session closed, even though **sessionToActivePool** would not exist in all of sessions.

Author: guowei2 <gu...@asiainfo.com>

Closes #3617 from guowei2/SPARK-4756 and squashes the following commits:

e9b97b8 [guowei2] fix compile bug with Shim12
cf0f521 [guowei2] Merge remote-tracking branch 'apache/master' into SPARK-4756
e070998 [guowei2] fix: remove active pool of the session when it expired


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/22ddb6e0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/22ddb6e0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/22ddb6e0

Branch: refs/heads/master
Commit: 22ddb6e0338f4d101389a0655424a8fde6c4cff4
Parents: b68bc6d
Author: guowei2 <gu...@asiainfo.com>
Authored: Thu Dec 18 20:10:23 2014 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Thu Dec 18 20:10:23 2014 -0800

----------------------------------------------------------------------
 .../sql/hive/thriftserver/SparkSQLSessionManager.scala      | 9 ++++++++-
 .../hive/thriftserver/server/SparkSQLOperationManager.scala | 3 +--
 .../org/apache/spark/sql/hive/thriftserver/Shim12.scala     | 6 +++---
 .../org/apache/spark/sql/hive/thriftserver/Shim13.scala     | 6 +++---
 4 files changed, 15 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/22ddb6e0/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
index 6b3275b..89e9ede 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala
@@ -27,11 +27,14 @@ import org.apache.hive.service.cli.session.SessionManager
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
 import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
+import org.apache.hive.service.cli.SessionHandle
 
 private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
   extends SessionManager
   with ReflectedCompositeService {
 
+  private lazy val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
+
   override def init(hiveConf: HiveConf) {
     setSuperField(this, "hiveConf", hiveConf)
 
@@ -40,10 +43,14 @@ private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
     getAncestorField[Log](this, 3, "LOG").info(
       s"HiveServer2: Async execution pool size $backgroundPoolSize")
 
-    val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
     setSuperField(this, "operationManager", sparkSqlOperationManager)
     addService(sparkSqlOperationManager)
 
     initCompositeService(hiveConf)
   }
+
+  override def closeSession(sessionHandle: SessionHandle) {
+    super.closeSession(sessionHandle)
+    sparkSqlOperationManager.sessionToActivePool -= sessionHandle
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/22ddb6e0/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
index 99c4f46..9c0bf02 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/server/SparkSQLOperationManager.scala
@@ -36,8 +36,7 @@ private[thriftserver] class SparkSQLOperationManager(hiveContext: HiveContext)
   val handleToOperation = ReflectionUtils
     .getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
 
-  // TODO: Currenlty this will grow infinitely, even as sessions expire
-  val sessionToActivePool = Map[HiveSession, String]()
+  val sessionToActivePool = Map[SessionHandle, String]()
 
   override def newExecuteStatementOperation(
       parentSession: HiveSession,

http://git-wip-us.apache.org/repos/asf/spark/blob/22ddb6e0/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
index 9258ad0..5550183 100644
--- a/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
+++ b/sql/hive-thriftserver/v0.12.0/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim12.scala
@@ -69,7 +69,7 @@ private[hive] class SparkExecuteStatementOperation(
     statement: String,
     confOverlay: JMap[String, String])(
     hiveContext: HiveContext,
-    sessionToActivePool: SMap[HiveSession, String])
+    sessionToActivePool: SMap[SessionHandle, String])
   extends ExecuteStatementOperation(parentSession, statement, confOverlay) with Logging {
 
   private var result: SchemaRDD = _
@@ -191,14 +191,14 @@ private[hive] class SparkExecuteStatementOperation(
       logDebug(result.queryExecution.toString())
       result.queryExecution.logical match {
         case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) =>
-          sessionToActivePool(parentSession) = value
+          sessionToActivePool(parentSession.getSessionHandle) = value
           logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
         case _ =>
       }
 
       val groupId = round(random * 1000000).toString
       hiveContext.sparkContext.setJobGroup(groupId, statement)
-      sessionToActivePool.get(parentSession).foreach { pool =>
+      sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
         hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
       }
       iter = {

http://git-wip-us.apache.org/repos/asf/spark/blob/22ddb6e0/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
index 17f1ad3..798a690 100644
--- a/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
+++ b/sql/hive-thriftserver/v0.13.1/src/main/scala/org/apache/spark/sql/hive/thriftserver/Shim13.scala
@@ -69,7 +69,7 @@ private[hive] class SparkExecuteStatementOperation(
     confOverlay: JMap[String, String],
     runInBackground: Boolean = true)(
     hiveContext: HiveContext,
-    sessionToActivePool: SMap[HiveSession, String])
+    sessionToActivePool: SMap[SessionHandle, String])
   // NOTE: `runInBackground` is set to `false` intentionally to disable asynchronous execution
   extends ExecuteStatementOperation(parentSession, statement, confOverlay, false) with Logging {
 
@@ -162,14 +162,14 @@ private[hive] class SparkExecuteStatementOperation(
       logDebug(result.queryExecution.toString())
       result.queryExecution.logical match {
         case SetCommand(Some((SQLConf.THRIFTSERVER_POOL, Some(value)))) =>
-          sessionToActivePool(parentSession) = value
+          sessionToActivePool(parentSession.getSessionHandle) = value
           logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
         case _ =>
       }
 
       val groupId = round(random * 1000000).toString
       hiveContext.sparkContext.setJobGroup(groupId, statement)
-      sessionToActivePool.get(parentSession).foreach { pool =>
+      sessionToActivePool.get(parentSession.getSessionHandle).foreach { pool =>
         hiveContext.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
       }
       iter = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org