You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by cloud-fan <gi...@git.apache.org> on 2017/12/14 17:37:56 UTC

[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

GitHub user cloud-fan opened a pull request:

    https://github.com/apache/spark/pull/19981

    [SPARK-22786][SQL] only use AppStatusPlugin in history server

    ## What changes were proposed in this pull request?
    
    In https://github.com/apache/spark/pull/19681 we introduced a new interface called `AppStatusPlugin`, to register listeners and set up the UI for both live and history UI.
    
    However I think it's an overkill for live UI. For example, we should not register `SQLListener` if users are not using SQL functions. Previously we register the `SQLListener` and set up SQL tab when `SparkSession` is firstly created, which indicates users are going to use SQL functions. But in #19681 , we register the SQL functions during `SparkContext` creation. The same thing should apply to streaming too.
    
    I think we should keep the previous behavior, and only use this new interface for history server.
    
    To reflect this change, I also rename the new interface to `SparkHistoryUIPlugin`
    
    ## How was this patch tested?
    
    existing tests

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/cloud-fan/spark listener

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19981.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19981
    
----
commit ba38723826c36ce99a61fbd37bd779a90c44d0a4
Author: Wenchen Fan <we...@databricks.com>
Date:   2017-12-14T17:31:21Z

    only use AppStatusPlugin in history server

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #84922 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84922/testReport)** for PR 19981 at commit [`ba38723`](https://github.com/apache/spark/commit/ba38723826c36ce99a61fbd37bd779a90c44d0a4).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85091/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85238 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85238/testReport)** for PR 19981 at commit [`94ee3c7`](https://github.com/apache/spark/commit/94ee3c7ddbe776783dcb880d9daa05f598a418a9).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #84967 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84967/testReport)** for PR 19981 at commit [`bc300f9`](https://github.com/apache/spark/commit/bc300f9a31a351f8630c9b9b189f5b499fd858a1).
     * This patch **fails from timeout after a configured wait of \`250m\`**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85041/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157135896
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ---
    @@ -142,286 +163,277 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
           (id, accumulatorValue)
         }.toMap
     
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
     
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq(
             createStageInfo(0, 0),
             createStageInfo(1, 0)
           ),
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
     
    -    assert(store.executionMetrics(0).isEmpty)
    +    assert(statusStore.executionMetrics(executionId).isEmpty)
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
         // Driver accumulator updates don't belong to this execution should be filtered and no
         // exception will be thrown.
    -    bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
    +    listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2)))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 3))
     
         // Retrying a stage should reset the metrics
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
         // Ignore the task end for the first attempt
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
         // Finish two tasks
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)),
           null))
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
           createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 5))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 5))
     
         // Summit a new stage
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0)))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 7))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 7))
     
         // Finish two tasks
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 11))
     
    -    assertJobs(store.execution(0), running = Seq(0))
    +    assertJobs(statusStore.execution(executionId), running = Seq(0))
     
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
     
    -    assertJobs(store.execution(0), completed = Seq(0))
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0))
    +
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 11))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") { (store, bus) =>
    +  test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    assertJobs(store.execution(0), completed = Seq(0))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") { (store, bus) =>
    +  test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
             jobId = 0,
             time = System.currentTimeMillis(),
             JobSucceeded
         ))
     
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 1,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 1,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    assertJobs(store.execution(0), completed = Seq(0, 1))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0, 1))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") { (store, bus) =>
    +  test("onExecutionEnd happens before onJobEnd(JobFailed)") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq.empty,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobFailed(new RuntimeException("Oops"))
         ))
     
    -    assertJobs(store.execution(0), failed = Seq(0))
    +    assertJobs(statusStore.execution(executionId), failed = Seq(0))
       }
     
       test("SPARK-11126: no memory leak when running non SQL jobs") {
    -    val previousStageNumber = statusStore.executionsList().size
    -    spark.sparkContext.parallelize(1 to 10).foreach(i => ())
    -    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
    -    // listener should ignore the non SQL stage
    -    assert(statusStore.executionsList().size == previousStageNumber)
    -
    -    spark.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
    -    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
    -    // listener should save the SQL stage
    -    assert(statusStore.executionsList().size == previousStageNumber + 1)
    -  }
    -
    -  test("driver side SQL metrics") {
    -    val oldCount = statusStore.executionsList().size
    -    val expectedAccumValue = 12345L
    -    val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue)
    -    val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) {
    -      override lazy val sparkPlan = physicalPlan
    -      override lazy val executedPlan = physicalPlan
    -    }
    -
    -    SQLExecution.withNewExecutionId(spark, dummyQueryExecution) {
    -      physicalPlan.execute().collect()
    -    }
    -
    -    while (statusStore.executionsList().size < oldCount) {
    -      Thread.sleep(100)
    -    }
    -
    -    // Wait for listener to finish computing the metrics for the execution.
    -    while (statusStore.executionsList().last.metricValues == null) {
    -      Thread.sleep(100)
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +    try {
    +      sparkContext.addSparkListener(listener)
    +      spark.sparkContext.parallelize(1 to 10).foreach(i => ())
    +      spark.sparkContext.listenerBus.waitUntilEmpty(10000)
    +      // Listener should ignore the non-SQL stages, as the stage data are only removed when SQL
    +      // execution ends, which will not be triggered for non-SQL jobs.
    +      assert(listener.stageMetrics.isEmpty)
    +    } finally {
    +      sparkContext.removeSparkListener(listener)
         }
    -
    -    val execId = statusStore.executionsList().last.executionId
    -    val metrics = statusStore.executionMetrics(execId)
    -    val driverMetric = physicalPlan.metrics("dummy")
    -    val expectedValue = SQLMetrics.stringValue(driverMetric.metricType, Seq(expectedAccumValue))
    -
    -    assert(metrics.contains(driverMetric.id))
    -    assert(metrics(driverMetric.id) === expectedValue)
    --- End diff --
    
    ditto, but I don't know why this passed before...


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #84943 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84943/testReport)** for PR 19981 at commit [`88fdff2`](https://github.com/apache/spark/commit/88fdff29cc106fd583640f8026dafd278acc9ec9).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157662992
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -322,15 +321,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
             (new InMemoryStore(), true)
         }
     
    +    val plugins = ServiceLoader.load(
    +      classOf[AppHistoryServerPlugin], Utils.getContextOrSparkClassLoader).asScala
         val trackingStore = new ElementTrackingStore(kvstore, conf)
    --- End diff --
    
    do we really need to limit the UI data for history server? cc @vanzin 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85064/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84943/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85263 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85263/testReport)** for PR 19981 at commit [`94ee3c7`](https://github.com/apache/spark/commit/94ee3c7ddbe776783dcb880d9daa05f598a418a9).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    > We are going to, right? Otherwise creating the interface just for SQL doesn't align with your goal to centralize this part.
    
    At some point, probably, but supporting streaming UIs in the SHS requires a lot more work before we even reach that point. There are fundamental problems with event logs and streaming apps that need to be resolved first.
    
    >  not catching event if users are not using SQL functions
    
    Not sure why that would be an issue - or rather, why that's different from the what's always been the case. Even if you have a SparkSession today you can run non-SQL jobs using the underlying context, and those will generate events that will be delivered to the SQL listener, and it has to deal with them (as it does).
    
    > I do think the previous code is cleaner than the current one.
    
    What do you think about either of my suggestions to simplify the code?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157580676
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---
    @@ -82,6 +82,19 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
        */
       val cacheManager: CacheManager = new CacheManager
     
    +  /**
    +   * A status store to query SQL status/metrics of this Spark application, based on SQL-specific
    +   * [[org.apache.spark.scheduler.SparkListenerEvent]]s.
    +   */
    +  val statusStore: SQLAppStatusStore = {
    --- End diff --
    
    I'd use `@Private` instead of `@Unstable` for the SparkSession method, that's all I'm saying. It more clearly maps to what you and the code are saying.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #84967 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84967/testReport)** for PR 19981 at commit [`bc300f9`](https://github.com/apache/spark/commit/bc300f9a31a351f8630c9b9b189f5b499fd858a1).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85047 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85047/testReport)** for PR 19981 at commit [`9f5d21f`](https://github.com/apache/spark/commit/9f5d21f1aa71b1e2ee0a820a1430f453ecf66f0f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157135495
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ---
    @@ -142,286 +163,277 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
           (id, accumulatorValue)
         }.toMap
     
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    --- End diff --
    
    I prefer the testing style before #19681 , which just call the event handling methods of the listener, instead of indirectly using an intermedia reply bus.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r158183772
  
    --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
    @@ -48,7 +48,7 @@ private[spark] class AppStatusListener(
     
       import config._
     
    -  private var sparkVersion = SPARK_VERSION
    +  private val sparkVersion = SPARK_VERSION
    --- End diff --
    
    filed https://issues.apache.org/jira/browse/SPARK-22854 , I'll do it later


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by gengliangwang <gi...@git.apache.org>.
Github user gengliangwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157136615
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---
    @@ -82,6 +82,19 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
        */
       val cacheManager: CacheManager = new CacheManager
     
    +  /**
    +   * A status store to query SQL status/metrics of this Spark application, based on SQL-specific
    +   * [[org.apache.spark.scheduler.SparkListenerEvent]]s.
    +   */
    +  val statusStore: SQLAppStatusStore = {
    --- End diff --
    
    +1, this is behavior change. It was good to have `SharedState.listener` and check metrics from it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157154044
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---
    @@ -82,6 +82,19 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
        */
       val cacheManager: CacheManager = new CacheManager
     
    +  /**
    +   * A status store to query SQL status/metrics of this Spark application, based on SQL-specific
    +   * [[org.apache.spark.scheduler.SparkListenerEvent]]s.
    +   */
    +  val statusStore: SQLAppStatusStore = {
    --- End diff --
    
    Makes sense to me.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85227 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85227/testReport)** for PR 19981 at commit [`94ee3c7`](https://github.com/apache/spark/commit/94ee3c7ddbe776783dcb880d9daa05f598a418a9).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85091 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85091/testReport)** for PR 19981 at commit [`60421ac`](https://github.com/apache/spark/commit/60421acf5c652731d64ffad013bd4c179d6401b5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85047 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85047/testReport)** for PR 19981 at commit [`9f5d21f`](https://github.com/apache/spark/commit/9f5d21f1aa71b1e2ee0a820a1430f453ecf66f0f).
     * This patch **fails from timeout after a configured wait of \`250m\`**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85263/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157331435
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---
    @@ -82,6 +82,19 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
        */
       val cacheManager: CacheManager = new CacheManager
     
    +  /**
    +   * A status store to query SQL status/metrics of this Spark application, based on SQL-specific
    +   * [[org.apache.spark.scheduler.SparkListenerEvent]]s.
    +   */
    +  val statusStore: SQLAppStatusStore = {
    --- End diff --
    
    at least it's developer-facing, as a developer I don't care about the naming changing, or API changing, but I just want the same functionality.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85093 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85093/testReport)** for PR 19981 at commit [`5b64f88`](https://github.com/apache/spark/commit/5b64f881adcf34b958aa659e62a9ce93171cf109).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85042 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85042/testReport)** for PR 19981 at commit [`9f5d21f`](https://github.com/apache/spark/commit/9f5d21f1aa71b1e2ee0a820a1430f453ecf66f0f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85091 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85091/testReport)** for PR 19981 at commit [`60421ac`](https://github.com/apache/spark/commit/60421acf5c652731d64ffad013bd4c179d6401b5).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85042 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85042/testReport)** for PR 19981 at commit [`9f5d21f`](https://github.com/apache/spark/commit/9f5d21f1aa71b1e2ee0a820a1430f453ecf66f0f).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157333324
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---
    @@ -82,6 +82,19 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
        */
       val cacheManager: CacheManager = new CacheManager
     
    +  /**
    +   * A status store to query SQL status/metrics of this Spark application, based on SQL-specific
    +   * [[org.apache.spark.scheduler.SparkListenerEvent]]s.
    +   */
    +  val statusStore: SQLAppStatusStore = {
    --- End diff --
    
    Sure, it's fine if you want to expose it. But I'm pointing out that it's pretty weird to expose a class in a ".internal" package through the API. Those are not documented nor go through mima checks, so there's absolutely zero guarantees about them.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157315586
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ---
    @@ -36,14 +36,23 @@ import org.apache.spark.sql.catalyst.util.quietly
     import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution}
     import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
     import org.apache.spark.sql.test.SharedSQLContext
    -import org.apache.spark.status.config._
    +import org.apache.spark.status.config.LIVE_ENTITY_UPDATE_PERIOD
     import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator}
     import org.apache.spark.util.kvstore.InMemoryStore
     
    -class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils {
    +
    +class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils {
       import testImplicits._
     
    -  override protected def sparkConf = super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
    +  override def beforeAll(): Unit = {
    +    super.beforeAll()
    +    sparkContext.conf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
    --- End diff --
    
    I commented on the other PR where you mentioned this, but I still don't get what this is changing. I don't see any global state that is being overriden by `override protected def sparkConf`. This test suite only extends traits (e.g. `SharedSQLContext` which extends `SharedSparkSession`), and those only keep suite-level state, not global state.
    
    There are other tests that do the same thing.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85064 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85064/testReport)** for PR 19981 at commit [`9f5d21f`](https://github.com/apache/spark/commit/9f5d21f1aa71b1e2ee0a820a1430f453ecf66f0f).
     * This patch **fails from timeout after a configured wait of \`250m\`**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    cc @vanzin @gengliangwang @gatorsmile 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157154532
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---
    @@ -82,6 +82,19 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
        */
       val cacheManager: CacheManager = new CacheManager
     
    +  /**
    +   * A status store to query SQL status/metrics of this Spark application, based on SQL-specific
    +   * [[org.apache.spark.scheduler.SparkListenerEvent]]s.
    +   */
    +  val statusStore: SQLAppStatusStore = {
    +    val kvStore = sparkContext.statusStore.store
    +    val listener = new SQLAppStatusListener(sparkContext.conf, kvStore, live = true)
    +    sparkContext.listenerBus.addToStatusQueue(listener)
    --- End diff --
    
    `SharedState` is kind of a singleton in Spark SQL. If you look at the old code, the global listener in `SparkSession` is initialized by `SharedState`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84947/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    cc @vanzin any more comments?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/19981


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157312759
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ---
    @@ -489,16 +501,17 @@ private case class MyPlan(sc: SparkContext, expectedValue: Long) extends LeafExe
     }
     
     
    -class SQLListenerMemoryLeakSuite extends SparkFunSuite {
    +class SQLAppStatusListenerMemoryLeakSuite extends SparkFunSuite {
     
    -  // TODO: this feature is not yet available in SQLAppStatusStore.
    -  ignore("no memory leak") {
    +  test("no memory leak") {
         quietly {
           val conf = new SparkConf()
             .setMaster("local")
             .setAppName("test")
    +        .set(LIVE_ENTITY_UPDATE_PERIOD, 0L) // Update the UI data immediately
             .set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this test quickly
    -        .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly
    +        // TODO: this feature is not yet available in SQLAppStatusStore.
    +        // .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly
    --- End diff --
    
    This test is re-enabled in #19751.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r158171335
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLHistoryServerPlugin.scala ---
    @@ -0,0 +1,34 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.ui
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.scheduler.SparkListener
    +import org.apache.spark.status.{AppHistoryServerPlugin, ElementTrackingStore}
    +import org.apache.spark.ui.SparkUI
    +
    +class SQLHistoryServerPlugin extends AppHistoryServerPlugin {
    +  override def createListeners(conf: SparkConf, store: ElementTrackingStore): Seq[SparkListener] = {
    +    Seq(new SQLAppStatusListener(conf, store, live = false))
    +  }
    +
    +  override def setupUI(ui: SparkUI): Unit = {
    +    val kvStore = ui.store.store
    +    new SQLTab(new SQLAppStatusStore(kvStore), ui)
    --- End diff --
    
    You shouldn't be adding the UI if there is no SQL-related data in the store.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #84947 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84947/testReport)** for PR 19981 at commit [`88fdff2`](https://github.com/apache/spark/commit/88fdff29cc106fd583640f8026dafd278acc9ec9).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157135580
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ---
    @@ -142,286 +163,277 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
           (id, accumulatorValue)
         }.toMap
     
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
     
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq(
             createStageInfo(0, 0),
             createStageInfo(1, 0)
           ),
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
     
    -    assert(store.executionMetrics(0).isEmpty)
    +    assert(statusStore.executionMetrics(executionId).isEmpty)
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
         // Driver accumulator updates don't belong to this execution should be filtered and no
         // exception will be thrown.
    -    bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
    +    listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2)))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 3))
     
         // Retrying a stage should reset the metrics
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
         // Ignore the task end for the first attempt
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
         // Finish two tasks
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)),
           null))
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
           createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 5))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 5))
     
         // Summit a new stage
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0)))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 7))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 7))
     
         // Finish two tasks
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 11))
     
    -    assertJobs(store.execution(0), running = Seq(0))
    +    assertJobs(statusStore.execution(executionId), running = Seq(0))
     
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
     
    -    assertJobs(store.execution(0), completed = Seq(0))
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0))
    +
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 11))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") { (store, bus) =>
    +  test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    assertJobs(store.execution(0), completed = Seq(0))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") { (store, bus) =>
    +  test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
             jobId = 0,
             time = System.currentTimeMillis(),
             JobSucceeded
         ))
     
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 1,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 1,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    assertJobs(store.execution(0), completed = Seq(0, 1))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0, 1))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") { (store, bus) =>
    +  test("onExecutionEnd happens before onJobEnd(JobFailed)") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq.empty,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobFailed(new RuntimeException("Oops"))
         ))
     
    -    assertJobs(store.execution(0), failed = Seq(0))
    +    assertJobs(statusStore.execution(executionId), failed = Seq(0))
       }
     
       test("SPARK-11126: no memory leak when running non SQL jobs") {
    -    val previousStageNumber = statusStore.executionsList().size
    -    spark.sparkContext.parallelize(1 to 10).foreach(i => ())
    -    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
    --- End diff --
    
    Previously we did not attach the testing listener to spark event bus, fixed now.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    It's less about cleanliness and more about discoverability IMO. Answer the question quickly: where is the SQL UI initialized?
    
    - my code: in the AppStatePlugin implementation
    - your code: it depends. And the two pieces of code don't even live in the same file or share any code.
    
    Again, I'll make the same suggestion as before: if you move the SQL tab visibility calculation to a new method in the SQL tab itself (and add a filter in `WebUI.getTabs` to only show visible tabs), the code will be simpler than either your or my version, and the user-visible behavior will remain the same.
    
    This would be the plugin code for both live and SHS:
    
    ```
      override def setupListeners(
          conf: SparkConf,
          store: ElementTrackingStore,
          addListenerFn: SparkListener => Unit,
          live: Boolean): Unit = {
        addListenerFn(new SQLAppStatusListener(conf, store, live, None))
      }
    
      override def setupUI(ui: SparkUI): Unit = {
        val listener = ui.sc.map { /* call LiveListenerBus.findListenersByClass */ }
        new SQLTab(new SQLAppStatusStore(kvstore, Some(listener)), ui)
      }
    ```
    
    Plus you can delete a bunch of code from `SQLAppStatusListener`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157153841
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---
    @@ -82,6 +82,19 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
        */
       val cacheManager: CacheManager = new CacheManager
     
    +  /**
    +   * A status store to query SQL status/metrics of this Spark application, based on SQL-specific
    +   * [[org.apache.spark.scheduler.SparkListenerEvent]]s.
    +   */
    +  val statusStore: SQLAppStatusStore = {
    +    val kvStore = sparkContext.statusStore.store
    +    val listener = new SQLAppStatusListener(sparkContext.conf, kvStore, live = true)
    +    sparkContext.listenerBus.addToStatusQueue(listener)
    --- End diff --
    
    There seems a global sql listener at `SparkSession` before. Now we create new sql listener for each `SharedState`? I think `SharedState` should be reused, but not sure if any case we create more than one `SharedState`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157135252
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ---
    @@ -36,14 +36,23 @@ import org.apache.spark.sql.catalyst.util.quietly
     import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution}
     import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
     import org.apache.spark.sql.test.SharedSQLContext
    -import org.apache.spark.status.config._
    +import org.apache.spark.status.config.LIVE_ENTITY_UPDATE_PERIOD
     import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator}
     import org.apache.spark.util.kvstore.InMemoryStore
     
    -class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils {
    +
    +class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils {
       import testImplicits._
     
    -  override protected def sparkConf = super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
    +  override def beforeAll(): Unit = {
    +    super.beforeAll()
    +    sparkContext.conf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
    --- End diff --
    
    only set this config for this suite.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by gengliangwang <gi...@git.apache.org>.
Github user gengliangwang commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85041 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85041/testReport)** for PR 19981 at commit [`5737705`](https://github.com/apache/spark/commit/57377055a3db6f12f67c55524917e4da1b49df94).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #84922 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84922/testReport)** for PR 19981 at commit [`ba38723`](https://github.com/apache/spark/commit/ba38723826c36ce99a61fbd37bd779a90c44d0a4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #84947 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84947/testReport)** for PR 19981 at commit [`88fdff2`](https://github.com/apache/spark/commit/88fdff29cc106fd583640f8026dafd278acc9ec9).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85040 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85040/testReport)** for PR 19981 at commit [`2e66722`](https://github.com/apache/spark/commit/2e667222690cad5dcd839b220ae9f938203813db).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85065 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85065/testReport)** for PR 19981 at commit [`a81917c`](https://github.com/apache/spark/commit/a81917c8e9e41d7b7e91e0469249904c675bac17).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157497701
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---
    @@ -82,6 +82,19 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
        */
       val cacheManager: CacheManager = new CacheManager
     
    +  /**
    +   * A status store to query SQL status/metrics of this Spark application, based on SQL-specific
    +   * [[org.apache.spark.scheduler.SparkListenerEvent]]s.
    +   */
    +  val statusStore: SQLAppStatusStore = {
    --- End diff --
    
    Also this is consistent to `SparkContext.statusStore`, we need central places to query the core/sql/streaming status.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85047/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157496956
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---
    @@ -82,6 +82,19 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
        */
       val cacheManager: CacheManager = new CacheManager
     
    +  /**
    +   * A status store to query SQL status/metrics of this Spark application, based on SQL-specific
    +   * [[org.apache.spark.scheduler.SparkListenerEvent]]s.
    +   */
    +  val statusStore: SQLAppStatusStore = {
    --- End diff --
    
    That's how it works, those things are not critical to the end users and it's ok to break them if no one objects.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84994/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    > Not sure why that would be an issue - or rather, why that's different from the what's always been the case.
    
    It's possible that people writing Spark applications with Spark SQL dependency, but not using SQL functions(just create `SparkContext`). This can happen if someone builds a lib based on Spark and Spark SQL, but the users only use the non-SQL APIs.
    
    > What do you think about either of my suggestions to simplify the code?
    
    I do think we should only create the SQL tab when users actually use SQL functions. And the same thing should apply to SQL listener.
    And previously they were consistent: we register the listener and setup the UI when creating `SparkSession`. But now they are not: we register the listener during `SparkContext` creation, and setup the UI after first SQL execution starts.
    
    No offense, but I would reject that PR if I was reviewing it, because of this behavior change.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85251/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157663383
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ---
    @@ -147,236 +159,246 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
           (id, accumulatorValue)
         }.toMap
     
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
     
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq(
             createStageInfo(0, 0),
             createStageInfo(1, 0)
           ),
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
     
    -    assert(store.executionMetrics(0).isEmpty)
    +    assert(statusStore.executionMetrics(executionId).isEmpty)
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
         // Driver accumulator updates don't belong to this execution should be filtered and no
         // exception will be thrown.
    -    bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
    +    listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2)))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 3))
     
         // Retrying a stage should reset the metrics
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
         // Ignore the task end for the first attempt
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
         // Finish two tasks
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)),
           null))
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
           createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 5))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 5))
     
         // Summit a new stage
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0)))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 7))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 7))
     
         // Finish two tasks
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 11))
     
    -    assertJobs(store.execution(0), running = Seq(0))
    +    assertJobs(statusStore.execution(executionId), running = Seq(0))
     
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
     
    -    assertJobs(store.execution(0), completed = Seq(0))
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0))
    +
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 11))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") { (store, bus) =>
    +  test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    assertJobs(store.execution(0), completed = Seq(0))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") { (store, bus) =>
    +  test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
             jobId = 0,
             time = System.currentTimeMillis(),
             JobSucceeded
         ))
     
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 1,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 1,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    assertJobs(store.execution(0), completed = Seq(0, 1))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0, 1))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") { (store, bus) =>
    +  test("onExecutionEnd happens before onJobEnd(JobFailed)") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq.empty,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobFailed(new RuntimeException("Oops"))
         ))
     
    -    assertJobs(store.execution(0), failed = Seq(0))
    +    assertJobs(statusStore.execution(executionId), failed = Seq(0))
       }
     
       test("SPARK-11126: no memory leak when running non SQL jobs") {
    -    val previousStageNumber = statusStore.executionsList().size
    --- End diff --
    
    Previously we didn't test the live data for stages.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85251 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85251/testReport)** for PR 19981 at commit [`94ee3c7`](https://github.com/apache/spark/commit/94ee3c7ddbe776783dcb880d9daa05f598a418a9).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157397655
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ---
    @@ -489,16 +501,17 @@ private case class MyPlan(sc: SparkContext, expectedValue: Long) extends LeafExe
     }
     
     
    -class SQLListenerMemoryLeakSuite extends SparkFunSuite {
    +class SQLAppStatusListenerMemoryLeakSuite extends SparkFunSuite {
     
    -  // TODO: this feature is not yet available in SQLAppStatusStore.
    -  ignore("no memory leak") {
    +  test("no memory leak") {
         quietly {
           val conf = new SparkConf()
             .setMaster("local")
             .setAppName("test")
    +        .set(LIVE_ENTITY_UPDATE_PERIOD, 0L) // Update the UI data immediately
             .set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this test quickly
    -        .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly
    +        // TODO: this feature is not yet available in SQLAppStatusStore.
    +        // .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly
    --- End diff --
    
    ok let me revert this part


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85263 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85263/testReport)** for PR 19981 at commit [`94ee3c7`](https://github.com/apache/spark/commit/94ee3c7ddbe776783dcb880d9daa05f598a418a9).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157135091
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---
    @@ -82,6 +82,19 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
        */
       val cacheManager: CacheManager = new CacheManager
     
    +  /**
    +   * A status store to query SQL status/metrics of this Spark application, based on SQL-specific
    +   * [[org.apache.spark.scheduler.SparkListenerEvent]]s.
    +   */
    +  val statusStore: SQLAppStatusStore = {
    --- End diff --
    
    I just realized this is important. Previously we can use `SharedState.listener` to query the SQL status. `SharedState` is actually a semi-public interface, as it's marked as `Unstable` in `SparkSession.sharedState`.
    
    Sometimes for debugging I just type `spark.sharedState.listener.xxx` in Spark Shell and check some status, but it's impossible now after #19681 
    
    There might be some other people like me that love this ability, we should not just remove it for future UI discoverbility(I think it's just SQL and streaming, really not a big gain)
    
    cc @hvanhovell @viirya @gatorsmile 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157582718
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ---
    @@ -142,288 +155,274 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
           (id, accumulatorValue)
         }.toMap
     
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
     
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq(
             createStageInfo(0, 0),
             createStageInfo(1, 0)
           ),
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
     
    -    assert(store.executionMetrics(0).isEmpty)
    +    assert(statusStore.executionMetrics(executionId).isEmpty)
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
         // Driver accumulator updates don't belong to this execution should be filtered and no
         // exception will be thrown.
    -    bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
    +    listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2)))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 3))
     
         // Retrying a stage should reset the metrics
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
         // Ignore the task end for the first attempt
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
         // Finish two tasks
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)),
           null))
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
           createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 5))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 5))
     
         // Summit a new stage
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0)))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 7))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 7))
     
         // Finish two tasks
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 11))
     
    -    assertJobs(store.execution(0), running = Seq(0))
    +    assertJobs(statusStore.execution(executionId), running = Seq(0))
     
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
     
    -    assertJobs(store.execution(0), completed = Seq(0))
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0))
    +
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 11))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") { (store, bus) =>
    +  test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    assertJobs(store.execution(0), completed = Seq(0))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") { (store, bus) =>
    +  test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
             jobId = 0,
             time = System.currentTimeMillis(),
             JobSucceeded
         ))
     
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 1,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 1,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    assertJobs(store.execution(0), completed = Seq(0, 1))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0, 1))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") { (store, bus) =>
    +  test("onExecutionEnd happens before onJobEnd(JobFailed)") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq.empty,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobFailed(new RuntimeException("Oops"))
         ))
     
    -    assertJobs(store.execution(0), failed = Seq(0))
    +    assertJobs(statusStore.execution(executionId), failed = Seq(0))
       }
     
       test("SPARK-11126: no memory leak when running non SQL jobs") {
    -    val previousStageNumber = statusStore.executionsList().size
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +    // No need to remove the listener as the SparkContext is dedicated to this test suite.
    +    sparkContext.addSparkListener(listener)
         spark.sparkContext.parallelize(1 to 10).foreach(i => ())
         spark.sparkContext.listenerBus.waitUntilEmpty(10000)
    -    // listener should ignore the non SQL stage
    -    assert(statusStore.executionsList().size == previousStageNumber)
    -
    -    spark.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
    -    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
    -    // listener should save the SQL stage
    -    assert(statusStore.executionsList().size == previousStageNumber + 1)
    +    // Listener should ignore the non-SQL stages, as the stage data are only removed when SQL
    +    // execution ends, which will not be triggered for non-SQL jobs.
    +    assert(listener.stageMetrics.isEmpty)
       }
     
       test("driver side SQL metrics") {
    -    val oldCount = statusStore.executionsList().size
    -    val expectedAccumValue = 12345L
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +    // No need to remove the listener as the SparkContext is dedicated to this test suite.
    +    sparkContext.addSparkListener(listener)
    +    val expectedAccumValue = 12345
         val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue)
         val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) {
           override lazy val sparkPlan = physicalPlan
           override lazy val executedPlan = physicalPlan
         }
    -
         SQLExecution.withNewExecutionId(spark, dummyQueryExecution) {
           physicalPlan.execute().collect()
         }
     
    -    while (statusStore.executionsList().size < oldCount) {
    -      Thread.sleep(100)
    -    }
    -
    -    // Wait for listener to finish computing the metrics for the execution.
    -    while (statusStore.executionsList().last.metricValues == null) {
    +    // Wait until execution finished
    +    var finished = false
    +    while (!finished) {
           Thread.sleep(100)
    +      val executionData = statusStore.executionsList().headOption
    --- End diff --
    
    For consistency with the checks below you should be checking `.lastOption`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85238 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85238/testReport)** for PR 19981 at commit [`94ee3c7`](https://github.com/apache/spark/commit/94ee3c7ddbe776783dcb880d9daa05f598a418a9).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    thanks, merging to master!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85238/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    > It's possible that people writing Spark applications with Spark SQL dependency, but not using SQL 
    
    Yes, and the SQL listener will ignore all the events, as it should even in the case that SQL was used, because users are allowed to run SQL and non-SQL workloads in the same application.
    
    > I do think we should only create the SQL tab when users actually use SQL functions. 
    
    Create or show? If it's not shown the user-visible behavior is the same, isn't it? That was the second of my suggestions.
    
    > because of this behavior change.
    
    The behavior change is all internal and not visible to users, and that was the goal of the change.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    So the problem is, it takes me a while to understand the new code, because of the weird design for `AppStatusPlugin`. At least we should have a separate plugin interface for live UI. Code readability is important to Spark, we should not sacrifice it without real benefits.
    
    On the other hand, I think it's better to not register a listener if unnecessary. `SQLListener` is fine because it's a no-op for non-sql events, but can we guarantee it in all other places like streaming? (need confirmation from @zsxwing @marmbrus ).
    
    And this is also an internal behavior change(the timing of registering SQL listeners), which may stop us from catching Spark events in SQL listener in the future. Do we have a good reason to change it? Having a central place for live UI setup seems not a strong reason to me.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    I intentionally created that interface to be used both by live applications and the SHS. What actual problem are you running into?
    
    > we should not register SQLListener if users are not using SQL functions
    
    Why not? That listener is basically a no-op if you're not running any SQL.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157581053
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala ---
    @@ -167,6 +168,24 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
     @DeveloperApi
     case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent
     
    +/**
    + * An interface for creating history listeners(to replay event logs) defined in other modules like
    + * SQL, and setup the UI of the plugin to rebuild the history UI.
    + */
    +private[spark] trait SparkHistoryUIPlugin {
    --- End diff --
    
    This is not a UI plugin. It's also only marginally related to this source file.
    
    It should remain in the `.status` package. If you really feel strongly about the existing name, you can use a different name (e.g. "AppHistoryServerPlugin" or something that doesn't explicit says "UI" or "Listener").


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157323132
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ---
    @@ -142,286 +163,277 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
           (id, accumulatorValue)
         }.toMap
     
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
     
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq(
             createStageInfo(0, 0),
             createStageInfo(1, 0)
           ),
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
     
    -    assert(store.executionMetrics(0).isEmpty)
    +    assert(statusStore.executionMetrics(executionId).isEmpty)
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
         // Driver accumulator updates don't belong to this execution should be filtered and no
         // exception will be thrown.
    -    bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
    +    listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2)))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 3))
     
         // Retrying a stage should reset the metrics
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
         // Ignore the task end for the first attempt
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
         // Finish two tasks
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)),
           null))
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
           createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 5))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 5))
     
         // Summit a new stage
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0)))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 7))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 7))
     
         // Finish two tasks
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 11))
     
    -    assertJobs(store.execution(0), running = Seq(0))
    +    assertJobs(statusStore.execution(executionId), running = Seq(0))
     
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
     
    -    assertJobs(store.execution(0), completed = Seq(0))
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0))
    +
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 11))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") { (store, bus) =>
    +  test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    assertJobs(store.execution(0), completed = Seq(0))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") { (store, bus) =>
    +  test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
             jobId = 0,
             time = System.currentTimeMillis(),
             JobSucceeded
         ))
     
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 1,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 1,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    assertJobs(store.execution(0), completed = Seq(0, 1))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0, 1))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") { (store, bus) =>
    +  test("onExecutionEnd happens before onJobEnd(JobFailed)") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq.empty,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobFailed(new RuntimeException("Oops"))
         ))
     
    -    assertJobs(store.execution(0), failed = Seq(0))
    +    assertJobs(statusStore.execution(executionId), failed = Seq(0))
       }
     
       test("SPARK-11126: no memory leak when running non SQL jobs") {
    -    val previousStageNumber = statusStore.executionsList().size
    -    spark.sparkContext.parallelize(1 to 10).foreach(i => ())
    -    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
    -    // listener should ignore the non SQL stage
    -    assert(statusStore.executionsList().size == previousStageNumber)
    -
    -    spark.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
    -    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
    -    // listener should save the SQL stage
    -    assert(statusStore.executionsList().size == previousStageNumber + 1)
    -  }
    -
    -  test("driver side SQL metrics") {
    -    val oldCount = statusStore.executionsList().size
    -    val expectedAccumValue = 12345L
    -    val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue)
    -    val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) {
    -      override lazy val sparkPlan = physicalPlan
    -      override lazy val executedPlan = physicalPlan
    -    }
    -
    -    SQLExecution.withNewExecutionId(spark, dummyQueryExecution) {
    -      physicalPlan.execute().collect()
    -    }
    -
    -    while (statusStore.executionsList().size < oldCount) {
    -      Thread.sleep(100)
    -    }
    -
    -    // Wait for listener to finish computing the metrics for the execution.
    -    while (statusStore.executionsList().last.metricValues == null) {
    -      Thread.sleep(100)
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +    try {
    +      sparkContext.addSparkListener(listener)
    +      spark.sparkContext.parallelize(1 to 10).foreach(i => ())
    +      spark.sparkContext.listenerBus.waitUntilEmpty(10000)
    +      // Listener should ignore the non-SQL stages, as the stage data are only removed when SQL
    +      // execution ends, which will not be triggered for non-SQL jobs.
    +      assert(listener.stageMetrics.isEmpty)
    +    } finally {
    +      sparkContext.removeSparkListener(listener)
         }
    -
    -    val execId = statusStore.executionsList().last.executionId
    -    val metrics = statusStore.executionMetrics(execId)
    -    val driverMetric = physicalPlan.metrics("dummy")
    -    val expectedValue = SQLMetrics.stringValue(driverMetric.metricType, Seq(expectedAccumValue))
    -
    -    assert(metrics.contains(driverMetric.id))
    -    assert(metrics(driverMetric.id) === expectedValue)
    --- End diff --
    
    In fact this will probably still pass if you restore the `LIVE_ENTITY_UPDATE_PERIOD` config in the session, since you'll still have a listener in the shared session (it's should still be installed automatically by your code, just not using the plugin).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85251 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85251/testReport)** for PR 19981 at commit [`94ee3c7`](https://github.com/apache/spark/commit/94ee3c7ddbe776783dcb880d9daa05f598a418a9).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85227 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85227/testReport)** for PR 19981 at commit [`94ee3c7`](https://github.com/apache/spark/commit/94ee3c7ddbe776783dcb880d9daa05f598a418a9).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    > Those are not being installed with this interface, are they?
    
    We are going to, right? Otherwise creating the interface just for SQL doesn't align with your goal to centralize this part.
    
    > But I really don't think going back to the previous way is the right thing.
    
    I do think the previous code is cleaner than the current one. This may due to personal tastes so need more feedback from other people.
    
    > The listener is now being installed before it was before (when the context starts, instead of later), so it's less likely that it will miss events.
    
    I'm talking about not catching event if users are not using SQL functions. Again this is an internal behavior change that can affect Spark event catching logic inside SQL listener, which may be added in the future. I'm not saying we can't change it, but with a proper reason.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85041 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85041/testReport)** for PR 19981 at commit [`5737705`](https://github.com/apache/spark/commit/57377055a3db6f12f67c55524917e4da1b49df94).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157317442
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ---
    @@ -142,286 +163,277 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
           (id, accumulatorValue)
         }.toMap
     
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
     
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq(
             createStageInfo(0, 0),
             createStageInfo(1, 0)
           ),
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
     
    -    assert(store.executionMetrics(0).isEmpty)
    +    assert(statusStore.executionMetrics(executionId).isEmpty)
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
         // Driver accumulator updates don't belong to this execution should be filtered and no
         // exception will be thrown.
    -    bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
    +    listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2)))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 3))
     
         // Retrying a stage should reset the metrics
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
         // Ignore the task end for the first attempt
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
         // Finish two tasks
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)),
           null))
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
           createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 5))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 5))
     
         // Summit a new stage
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0)))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 7))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 7))
     
         // Finish two tasks
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 11))
     
    -    assertJobs(store.execution(0), running = Seq(0))
    +    assertJobs(statusStore.execution(executionId), running = Seq(0))
     
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
     
    -    assertJobs(store.execution(0), completed = Seq(0))
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0))
    +
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 11))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") { (store, bus) =>
    +  test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    assertJobs(store.execution(0), completed = Seq(0))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") { (store, bus) =>
    +  test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
             jobId = 0,
             time = System.currentTimeMillis(),
             JobSucceeded
         ))
     
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 1,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 1,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    assertJobs(store.execution(0), completed = Seq(0, 1))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0, 1))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") { (store, bus) =>
    +  test("onExecutionEnd happens before onJobEnd(JobFailed)") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq.empty,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobFailed(new RuntimeException("Oops"))
         ))
     
    -    assertJobs(store.execution(0), failed = Seq(0))
    +    assertJobs(statusStore.execution(executionId), failed = Seq(0))
       }
     
       test("SPARK-11126: no memory leak when running non SQL jobs") {
    -    val previousStageNumber = statusStore.executionsList().size
    -    spark.sparkContext.parallelize(1 to 10).foreach(i => ())
    -    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
    -    // listener should ignore the non SQL stage
    -    assert(statusStore.executionsList().size == previousStageNumber)
    -
    -    spark.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
    -    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
    -    // listener should save the SQL stage
    -    assert(statusStore.executionsList().size == previousStageNumber + 1)
    -  }
    -
    -  test("driver side SQL metrics") {
    -    val oldCount = statusStore.executionsList().size
    -    val expectedAccumValue = 12345L
    -    val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue)
    -    val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) {
    -      override lazy val sparkPlan = physicalPlan
    -      override lazy val executedPlan = physicalPlan
    -    }
    -
    -    SQLExecution.withNewExecutionId(spark, dummyQueryExecution) {
    -      physicalPlan.execute().collect()
    -    }
    -
    -    while (statusStore.executionsList().size < oldCount) {
    -      Thread.sleep(100)
    -    }
    -
    -    // Wait for listener to finish computing the metrics for the execution.
    -    while (statusStore.executionsList().last.metricValues == null) {
    -      Thread.sleep(100)
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +    try {
    +      sparkContext.addSparkListener(listener)
    +      spark.sparkContext.parallelize(1 to 10).foreach(i => ())
    +      spark.sparkContext.listenerBus.waitUntilEmpty(10000)
    +      // Listener should ignore the non-SQL stages, as the stage data are only removed when SQL
    +      // execution ends, which will not be triggered for non-SQL jobs.
    +      assert(listener.stageMetrics.isEmpty)
    +    } finally {
    +      sparkContext.removeSparkListener(listener)
         }
    -
    -    val execId = statusStore.executionsList().last.executionId
    -    val metrics = statusStore.executionMetrics(execId)
    -    val driverMetric = physicalPlan.metrics("dummy")
    -    val expectedValue = SQLMetrics.stringValue(driverMetric.metricType, Seq(expectedAccumValue))
    -
    -    assert(metrics.contains(driverMetric.id))
    -    assert(metrics(driverMetric.id) === expectedValue)
    --- End diff --
    
    This passed before because the listener was automatically being added to the bus using the plugin interface you've removed in this PR.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84922/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157663315
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ---
    @@ -36,11 +36,12 @@ import org.apache.spark.sql.catalyst.util.quietly
     import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution}
     import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
     import org.apache.spark.sql.test.SharedSQLContext
    -import org.apache.spark.status.config._
    +import org.apache.spark.status.config.LIVE_ENTITY_UPDATE_PERIOD
     import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator}
     import org.apache.spark.util.kvstore.InMemoryStore
     
    -class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils {
    +
    +class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils {
    --- End diff --
    
    Then it's an existing problem of `SQLListenerSuite`, as previously it didn't only test `SQLListener`. This should not stop us from renaming it after we rename `SQLListener`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157829182
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
    @@ -322,15 +321,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
             (new InMemoryStore(), true)
         }
     
    +    val plugins = ServiceLoader.load(
    +      classOf[AppHistoryServerPlugin], Utils.getContextOrSparkClassLoader).asScala
         val trackingStore = new ElementTrackingStore(kvstore, conf)
    --- End diff --
    
    Yes, both because it's the old behavior, and to limit the app's history data growth. Also because the UI code itself doesn't scale to arbitrarily large lists of things like jobs and stages.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157582494
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ---
    @@ -142,288 +155,274 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
           (id, accumulatorValue)
         }.toMap
     
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
     
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq(
             createStageInfo(0, 0),
             createStageInfo(1, 0)
           ),
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
     
    -    assert(store.executionMetrics(0).isEmpty)
    +    assert(statusStore.executionMetrics(executionId).isEmpty)
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
         // Driver accumulator updates don't belong to this execution should be filtered and no
         // exception will be thrown.
    -    bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
    +    listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2)))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 3))
     
         // Retrying a stage should reset the metrics
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
         // Ignore the task end for the first attempt
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2))
     
         // Finish two tasks
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)),
           null))
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
           createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 5))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 5))
     
         // Summit a new stage
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0)))
    +    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 7))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 7))
     
         // Finish two tasks
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 11))
     
    -    assertJobs(store.execution(0), running = Seq(0))
    +    assertJobs(statusStore.execution(executionId), running = Seq(0))
     
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
     
    -    assertJobs(store.execution(0), completed = Seq(0))
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0))
    +
    +    checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 11))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") { (store, bus) =>
    +  test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    assertJobs(store.execution(0), completed = Seq(0))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") { (store, bus) =>
    +  test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
             jobId = 0,
             time = System.currentTimeMillis(),
             JobSucceeded
         ))
     
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 1,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 1,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    assertJobs(store.execution(0), completed = Seq(0, 1))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0, 1))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") { (store, bus) =>
    +  test("onExecutionEnd happens before onJobEnd(JobFailed)") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq.empty,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobFailed(new RuntimeException("Oops"))
         ))
     
    -    assertJobs(store.execution(0), failed = Seq(0))
    +    assertJobs(statusStore.execution(executionId), failed = Seq(0))
       }
     
       test("SPARK-11126: no memory leak when running non SQL jobs") {
    -    val previousStageNumber = statusStore.executionsList().size
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +    // No need to remove the listener as the SparkContext is dedicated to this test suite.
    +    sparkContext.addSparkListener(listener)
         spark.sparkContext.parallelize(1 to 10).foreach(i => ())
         spark.sparkContext.listenerBus.waitUntilEmpty(10000)
    -    // listener should ignore the non SQL stage
    -    assert(statusStore.executionsList().size == previousStageNumber)
    -
    -    spark.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
    -    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
    -    // listener should save the SQL stage
    -    assert(statusStore.executionsList().size == previousStageNumber + 1)
    +    // Listener should ignore the non-SQL stages, as the stage data are only removed when SQL
    +    // execution ends, which will not be triggered for non-SQL jobs.
    +    assert(listener.stageMetrics.isEmpty)
       }
     
       test("driver side SQL metrics") {
    -    val oldCount = statusStore.executionsList().size
    -    val expectedAccumValue = 12345L
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +    // No need to remove the listener as the SparkContext is dedicated to this test suite.
    +    sparkContext.addSparkListener(listener)
    +    val expectedAccumValue = 12345
         val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue)
         val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) {
           override lazy val sparkPlan = physicalPlan
           override lazy val executedPlan = physicalPlan
         }
    -
         SQLExecution.withNewExecutionId(spark, dummyQueryExecution) {
           physicalPlan.execute().collect()
         }
     
    -    while (statusStore.executionsList().size < oldCount) {
    -      Thread.sleep(100)
    -    }
    -
    -    // Wait for listener to finish computing the metrics for the execution.
    -    while (statusStore.executionsList().last.metricValues == null) {
    +    // Wait until execution finished
    +    var finished = false
    +    while (!finished) {
           Thread.sleep(100)
    +      val executionData = statusStore.executionsList().headOption
    +      finished = executionData.isDefined && executionData.get.jobs.values
    +        .forall(_ == JobExecutionStatus.SUCCEEDED)
    --- End diff --
    
    This is not the same check as before. It assumes that `onJobEnd` is called after `onExecutionEnd`, which might be the case now, but was explicitly mentioned as not being always the case in the original `SQLListener` code.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157137789
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---
    @@ -82,6 +82,19 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
        */
       val cacheManager: CacheManager = new CacheManager
     
    +  /**
    +   * A status store to query SQL status/metrics of this Spark application, based on SQL-specific
    +   * [[org.apache.spark.scheduler.SparkListenerEvent]]s.
    +   */
    +  val statusStore: SQLAppStatusStore = {
    --- End diff --
    
    +1 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85065/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    > I also not sure why we need a plugin interface for live UI.
    
    So your solution to that is to have completely separate code for both cases? I really prefer to have a single place to go to to understand how the listeners and the UI are initialized, even if the SQL plugin implementation is sub-optimal.
    
    Again, you're stating a matter of preference, but you haven't explained what problem this is causing. Not using the interface doesn't mean you get rid of the conditionals, it just means the conditionals are implied by having the code that installs the listener and UI for live applications live in a different place.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by gengliangwang <gi...@git.apache.org>.
Github user gengliangwang commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    retest this please
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85093/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157585557
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ---
    @@ -517,9 +516,12 @@ class SQLListenerMemoryLeakSuite extends SparkFunSuite {
               }
             }
             sc.listenerBus.waitUntilEmpty(10000)
    -
    -        val statusStore = new SQLAppStatusStore(sc.statusStore.store)
    -        assert(statusStore.executionsList().size <= 50)
    +        val statusStore = spark.sharedState.statusStore
    +        assert(statusStore.executionsCount() == 200)
    --- End diff --
    
    This is now wrong, isn't it? The configuration explicitly says "50". Since this test is not being run, you should leave the code as before (with just needed changes, if any, for it to compile).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85040 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85040/testReport)** for PR 19981 at commit [`2e66722`](https://github.com/apache/spark/commit/2e667222690cad5dcd839b220ae9f938203813db).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85065 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85065/testReport)** for PR 19981 at commit [`a81917c`](https://github.com/apache/spark/commit/a81917c8e9e41d7b7e91e0469249904c675bac17).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by gengliangwang <gi...@git.apache.org>.
Github user gengliangwang commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85040/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #84994 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84994/testReport)** for PR 19981 at commit [`bc300f9`](https://github.com/apache/spark/commit/bc300f9a31a351f8630c9b9b189f5b499fd858a1).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    I don't think `AppStatusPlugin` is well designed to support both live and history server. The fact that `SQLAppStatusPlugin` needs flags to decide to register the listener in `setupListeners` or `setupUI` looks pretty hacky.
    
    I also not sure why we need a plugin interface for live UI. The SQL and Streaming UI are implemented pretty well and clean. I do agree that an interface for history server is very useful, and that's why we have `SparkHistoryListenerFactory` before.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Internal behavior change also needs careful review, I'd like to wait for feedback from others.
    
    But is the previous code really that hacky and worth a factor with a new interface? https://github.com/apache/spark/pull/19981/files#diff-42e78d37f5dcb2a1576f83b53bbf4b55R88 this looks pretty nice to me, and so is the streaming one in `StreamingManager`.  If you can find a way to make the code better, I'm totally fine. But before that, can we fall back to the previous code which is obviously better than the current one?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r158170972
  
    --- Diff: core/src/main/scala/org/apache/spark/status/AppStatusListener.scala ---
    @@ -48,7 +48,7 @@ private[spark] class AppStatusListener(
     
       import config._
     
    -  private var sparkVersion = SPARK_VERSION
    +  private val sparkVersion = SPARK_VERSION
    --- End diff --
    
    Actually this is a bug; the version should be read from `SparkListenerLogStart` when it's in the event log. Feel free to file a separate bug.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85093 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85093/testReport)** for PR 19981 at commit [`5b64f88`](https://github.com/apache/spark/commit/5b64f881adcf34b958aa659e62a9ce93171cf109).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157581481
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala ---
    @@ -31,16 +31,13 @@ import org.apache.spark.util.Utils
     
     
     trait SQLMetricsTestUtils extends SQLTestUtils {
    -
       import testImplicits._
     
    -  private def statusStore: SQLAppStatusStore = {
    -    new SQLAppStatusStore(sparkContext.statusStore.store)
    +  protected def currentExecutionIds(): Set[Long] = {
    +    spark.sharedState.statusStore.executionsList.map(_.executionId).toSet
    --- End diff --
    
    You can just call `statusStore` no?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85227/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157136099
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ---
    @@ -489,16 +501,17 @@ private case class MyPlan(sc: SparkContext, expectedValue: Long) extends LeafExe
     }
     
     
    -class SQLListenerMemoryLeakSuite extends SparkFunSuite {
    +class SQLAppStatusListenerMemoryLeakSuite extends SparkFunSuite {
     
    -  // TODO: this feature is not yet available in SQLAppStatusStore.
    -  ignore("no memory leak") {
    +  test("no memory leak") {
         quietly {
           val conf = new SparkConf()
             .setMaster("local")
             .setAppName("test")
    +        .set(LIVE_ENTITY_UPDATE_PERIOD, 0L) // Update the UI data immediately
             .set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this test quickly
    -        .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly
    +        // TODO: this feature is not yet available in SQLAppStatusStore.
    +        // .set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly
    --- End diff --
    
    instead of totally disable this test because of an unimplemented feature, I'd like to still run it, but a little slower.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84967/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #84994 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84994/testReport)** for PR 19981 at commit [`bc300f9`](https://github.com/apache/spark/commit/bc300f9a31a351f8630c9b9b189f5b499fd858a1).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157312345
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---
    @@ -82,6 +82,19 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging {
        */
       val cacheManager: CacheManager = new CacheManager
     
    +  /**
    +   * A status store to query SQL status/metrics of this Spark application, based on SQL-specific
    +   * [[org.apache.spark.scheduler.SparkListenerEvent]]s.
    +   */
    +  val statusStore: SQLAppStatusStore = {
    --- End diff --
    
    Even if you add the property back, it won't be the same listener. There's no way to keep the old API without keeping all of the old code.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    > Code readability is important to Spark, we should not sacrifice it without real benefits.
    
    I agree, but I also think that duplicating code in disjoint places hurts readability, not helps it.
    
    > but can we guarantee it in all other places like streaming
    
    Those are not being installed with this interface, are they?
    
    > which may stop us from catching Spark events in SQL listener in the future
    
    The listener is now being installed before it was before (when the context starts, instead of later), so it's less likely that it will miss events.
    
    As I mentioned in the other PR you commented on, I do understand that the code here is not super pretty. But I really don't think going back to the previous way is the right thing.
    
    For example, a way to simplify the plugin code is to change the way visibility of the SQL tab is done. Either always show it (simpler), or add a new method in `SparkUITab` that says whether the tab is visible, and have the SQL tab override it. Either of those would simplify the plugin code a lot (setupListener would just install the listener, always, setupUI would just add the tab, always).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157584958
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala ---
    @@ -36,11 +36,12 @@ import org.apache.spark.sql.catalyst.util.quietly
     import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution}
     import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
     import org.apache.spark.sql.test.SharedSQLContext
    -import org.apache.spark.status.config._
    +import org.apache.spark.status.config.LIVE_ENTITY_UPDATE_PERIOD
     import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator}
     import org.apache.spark.util.kvstore.InMemoryStore
     
    -class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils {
    +
    +class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils {
    --- End diff --
    
    The reason I didn't rename this class is because it contains tests that have nothing to do with the listener itself (like the test for SPARK-18462), and doing the proper thing (break those tests out into a separate suite) would be too noisy for the original change (and would also be pretty noisy here).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/85042/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    @vanzin We can add `AppStatusPlugin` back if you can convince other people that the SQL listener and UI tag should be set up during `SparkContext` initialization(I'm kind of convinced after some more thoughts, for future SQL listener usage it's more important to catch all events including non-SQL ones), and figure out how to keep `SharedState.statusStore`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #84943 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84943/testReport)** for PR 19981 at commit [`88fdff2`](https://github.com/apache/spark/commit/88fdff29cc106fd583640f8026dafd278acc9ec9).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19981: [SPARK-22786][SQL] only use AppStatusPlugin in hi...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157134291
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala ---
    @@ -25,21 +25,17 @@ import scala.collection.mutable.ArrayBuffer
     
     import com.fasterxml.jackson.databind.annotation.JsonDeserialize
     
    -import org.apache.spark.{JobExecutionStatus, SparkConf}
    -import org.apache.spark.scheduler.SparkListener
    -import org.apache.spark.status.AppStatusPlugin
    +import org.apache.spark.JobExecutionStatus
     import org.apache.spark.status.KVUtils.KVIndexParam
    -import org.apache.spark.ui.SparkUI
    -import org.apache.spark.util.Utils
     import org.apache.spark.util.kvstore.KVStore
     
     /**
      * Provides a view of a KVStore with methods that make it easy to query SQL-specific state. There's
      * no state kept in this class, so it's ok to have multiple instances of it in an application.
      */
    -private[sql] class SQLAppStatusStore(
    +class SQLAppStatusStore(
    --- End diff --
    
    Following the existing convention, classes under the `execution` package are meant to be private and doesn't need the  `private[sql]`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19981: [SPARK-22786][SQL] only use AppStatusPlugin in history s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19981
  
    **[Test build #85064 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/85064/testReport)** for PR 19981 at commit [`9f5d21f`](https://github.com/apache/spark/commit/9f5d21f1aa71b1e2ee0a820a1430f453ecf66f0f).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org