You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/10/13 18:57:59 UTC

spark git commit: [SPARK-11030] [SQL] share the SQLTab across sessions

Repository: spark
Updated Branches:
  refs/heads/master 1797055db -> d0cc79ccd


[SPARK-11030] [SQL] share the SQLTab across sessions

The SQLTab will be shared by multiple sessions.

If we create multiple independent SQLContexts (not using newSession()), will still see multiple SQLTabs in the Spark UI.

Author: Davies Liu <da...@databricks.com>

Closes #9048 from davies/sqlui.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0cc79cc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0cc79cc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0cc79cc

Branch: refs/heads/master
Commit: d0cc79ccd0b4500bd6b18184a723dabc164e8abd
Parents: 1797055
Author: Davies Liu <da...@databricks.com>
Authored: Tue Oct 13 09:57:53 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Oct 13 09:57:53 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLContext.scala | 23 ++++++++++++++------
 .../spark/sql/execution/ui/SQLListener.scala    | 10 +++------
 .../apache/spark/sql/execution/ui/SQLTab.scala  |  4 +---
 .../sql/execution/ui/SQLListenerSuite.scala     |  8 +++----
 .../org/apache/spark/sql/hive/HiveContext.scala | 12 +++++++---
 5 files changed, 33 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d0cc79cc/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 1bd2913..cd93725 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -65,12 +65,15 @@ import org.apache.spark.util.Utils
 class SQLContext private[sql](
     @transient val sparkContext: SparkContext,
     @transient protected[sql] val cacheManager: CacheManager,
+    @transient private[sql] val listener: SQLListener,
     val isRootContext: Boolean)
   extends org.apache.spark.Logging with Serializable {
 
   self =>
 
-  def this(sparkContext: SparkContext) = this(sparkContext, new CacheManager, true)
+  def this(sparkContext: SparkContext) = {
+    this(sparkContext, new CacheManager, SQLContext.createListenerAndUI(sparkContext), true)
+  }
   def this(sparkContext: JavaSparkContext) = this(sparkContext.sc)
 
   // If spark.sql.allowMultipleContexts is true, we will throw an exception if a user
@@ -97,7 +100,7 @@ class SQLContext private[sql](
 
   /**
    * Returns a SQLContext as new session, with separated SQL configurations, temporary tables,
-   * registered functions, but sharing the same SparkContext and CacheManager.
+   * registered functions, but sharing the same SparkContext, CacheManager, SQLListener and SQLTab.
    *
    * @since 1.6.0
    */
@@ -105,6 +108,7 @@ class SQLContext private[sql](
     new SQLContext(
       sparkContext = sparkContext,
       cacheManager = cacheManager,
+      listener = listener,
       isRootContext = false)
   }
 
@@ -113,11 +117,6 @@ class SQLContext private[sql](
    */
   protected[sql] lazy val conf = new SQLConf
 
-  // `listener` should be only used in the driver
-  @transient private[sql] val listener = new SQLListener(this)
-  sparkContext.addSparkListener(listener)
-  sparkContext.ui.foreach(new SQLTab(this, _))
-
   /**
    * Set Spark SQL configuration properties.
    *
@@ -1312,4 +1311,14 @@ object SQLContext {
       ): InternalRow
     }
   }
+
+  /**
+   * Create a SQLListener then add it into SparkContext, and create an SQLTab if there is SparkUI.
+   */
+  private[sql] def createListenerAndUI(sc: SparkContext): SQLListener = {
+    val listener = new SQLListener(sc.conf)
+    sc.addSparkListener(listener)
+    sc.ui.foreach(new SQLTab(listener, _))
+    listener
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d0cc79cc/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index 5779c71..d647240 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -19,19 +19,15 @@ package org.apache.spark.sql.execution.ui
 
 import scala.collection.mutable
 
-import com.google.common.annotations.VisibleForTesting
-
-import org.apache.spark.{JobExecutionStatus, Logging}
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler._
-import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue}
+import org.apache.spark.{JobExecutionStatus, Logging, SparkConf}
 
-private[sql] class SQLListener(sqlContext: SQLContext) extends SparkListener with Logging {
+private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Logging {
 
-  private val retainedExecutions =
-    sqlContext.sparkContext.conf.getInt("spark.sql.ui.retainedExecutions", 1000)
+  private val retainedExecutions = conf.getInt("spark.sql.ui.retainedExecutions", 1000)
 
   private val activeExecutions = mutable.HashMap[Long, SQLExecutionUIData]()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d0cc79cc/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
index 0b0867f..9c27944 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
@@ -20,14 +20,12 @@ package org.apache.spark.sql.execution.ui
 import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.spark.Logging
-import org.apache.spark.sql.SQLContext
 import org.apache.spark.ui.{SparkUI, SparkUITab}
 
-private[sql] class SQLTab(sqlContext: SQLContext, sparkUI: SparkUI)
+private[sql] class SQLTab(val listener: SQLListener, sparkUI: SparkUI)
   extends SparkUITab(sparkUI, SQLTab.nextTabName) with Logging {
 
   val parent = sparkUI
-  val listener = sqlContext.listener
 
   attachPage(new AllExecutionsPage(this))
   attachPage(new ExecutionPage(this))

http://git-wip-us.apache.org/repos/asf/spark/blob/d0cc79cc/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 7a46c69..727cf36 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -74,7 +74,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
   }
 
   test("basic") {
-    val listener = new SQLListener(sqlContext)
+    val listener = new SQLListener(sqlContext.sparkContext.conf)
     val executionId = 0
     val df = createTestDataFrame
     val accumulatorIds =
@@ -212,7 +212,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
   }
 
   test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
-    val listener = new SQLListener(sqlContext)
+    val listener = new SQLListener(sqlContext.sparkContext.conf)
     val executionId = 0
     val df = createTestDataFrame
     listener.onExecutionStart(
@@ -241,7 +241,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
   }
 
   test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
-    val listener = new SQLListener(sqlContext)
+    val listener = new SQLListener(sqlContext.sparkContext.conf)
     val executionId = 0
     val df = createTestDataFrame
     listener.onExecutionStart(
@@ -281,7 +281,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
   }
 
   test("onExecutionEnd happens before onJobEnd(JobFailed)") {
-    val listener = new SQLListener(sqlContext)
+    val listener = new SQLListener(sqlContext.sparkContext.conf)
     val executionId = 0
     val df = createTestDataFrame
     listener.onExecutionStart(

http://git-wip-us.apache.org/repos/asf/spark/blob/d0cc79cc/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index ddeadd3..e620d7f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -40,12 +40,13 @@ import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.SQLConf.SQLConfEntry
 import org.apache.spark.sql.SQLConf.SQLConfEntry._
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression}
-import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, SqlParser}
 import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, PreInsertCastAndRename, PreWriteCheck}
+import org.apache.spark.sql.execution.ui.SQLListener
 import org.apache.spark.sql.execution.{CacheManager, ExecutedCommand, ExtractPythonUDFs, SetCommand}
 import org.apache.spark.sql.hive.client._
 import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand}
@@ -88,12 +89,16 @@ private[hive] case class CurrentDatabase(ctx: HiveContext)
 class HiveContext private[hive](
     sc: SparkContext,
     cacheManager: CacheManager,
+    @transient listener: SQLListener,
     @transient execHive: ClientWrapper,
     @transient metaHive: ClientInterface,
-    isRootContext: Boolean) extends SQLContext(sc, cacheManager, isRootContext) with Logging {
+    isRootContext: Boolean)
+  extends SQLContext(sc, cacheManager, listener, isRootContext) with Logging {
   self =>
 
-  def this(sc: SparkContext) = this(sc, new CacheManager, null, null, true)
+  def this(sc: SparkContext) = {
+    this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), null, null, true)
+  }
   def this(sc: JavaSparkContext) = this(sc.sc)
 
   import org.apache.spark.sql.hive.HiveContext._
@@ -109,6 +114,7 @@ class HiveContext private[hive](
     new HiveContext(
       sc = sc,
       cacheManager = cacheManager,
+      listener = listener,
       execHive = executionHive.newSession(),
       metaHive = metadataHive.newSession(),
       isRootContext = false)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org