You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by vanzin <gi...@git.apache.org> on 2017/11/07 00:31:16 UTC

[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

GitHub user vanzin opened a pull request:

    https://github.com/apache/spark/pull/19681

    [SPARK-20652][sql] Store SQL UI data in the new app status store.

    This change replaces the SQLListener with a new implementation that
    saves the data to the same store used by the SparkContext's status
    store. For that, the types used by the old SQLListener had to be
    updated a bit so that they're more serialization-friendly.
    
    The interface for getting data from the store was abstracted into
    a new class, SQLAppStatusStore (following the convention used in
    core).
    
    Another change is the way that the SQL UI hooks up into the core
    UI or the SHS. The old "SparkHistoryListenerFactory" was replaced
    with a new "AppStatePlugin" that more explicitly differentiates
    between the two use cases: processing events, and showing the UI.
    Both live apps and the SHS use this new API (previously, it was
    restricted to the SHS).
    
    Note on the above: this causes a slight change of behavior for
    live apps; the SQL tab will only show up after the first execution
    is started.
    
    The metrics gathering code was re-worked a bit so that the types
    used are less memory hungry and more serialization-friendly. This
    reduces memory usage when using in-memory stores, and reduces load
    times when using disk stores.
    
    Tested with existing and added unit tests. Note one unit test was
    disabled because it depends on SPARK-20653, which isn't in yet.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vanzin/spark SPARK-20652

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19681.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19681
    
----
commit ccd5adc1d6273b92fd6c9a0d4817451a5acb566a
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2017-04-06T17:00:25Z

    [SPARK-20652][sql] Store SQL UI data in the new app status store.
    
    This change replaces the SQLListener with a new implementation that
    saves the data to the same store used by the SparkContext's status
    store. For that, the types used by the old SQLListener had to be
    updated a bit so that they're more serialization-friendly.
    
    The interface for getting data from the store was abstracted into
    a new class, SQLAppStatusStore (following the convention used in
    core).
    
    Another change is the way that the SQL UI hooks up into the core
    UI or the SHS. The old "SparkHistoryListenerFactory" was replaced
    with a new "AppStatePlugin" that more explicitly differentiates
    between the two use cases: processing events, and showing the UI.
    Both live apps and the SHS use this new API (previously, it was
    restricted to the SHS).
    
    Note on the above: this causes a slight change of behavior for
    live apps; the SQL tab will only show up after the first execution
    is started.
    
    The metrics gathering code was re-worked a bit so that the types
    used are less memory hungry and more serialization-friendly. This
    reduces memory usage when using in-memory stores, and reduces load
    times when using disk stores.
    
    Tested with existing and added unit tests. Note one unit test was
    disabled because it depends on SPARK-20653, which isn't in yet.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/19681


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r157013164
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.ui
    +
    +import java.lang.{Long => JLong}
    +import java.util.Date
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ArrayBuffer
    +
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.scheduler.SparkListener
    +import org.apache.spark.status.AppStatusPlugin
    +import org.apache.spark.status.KVUtils.KVIndexParam
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.Utils
    +import org.apache.spark.util.kvstore.KVStore
    +
    +/**
    + * Provides a view of a KVStore with methods that make it easy to query SQL-specific state. There's
    + * no state kept in this class, so it's ok to have multiple instances of it in an application.
    + */
    +private[sql] class SQLAppStatusStore(
    +    store: KVStore,
    +    listener: Option[SQLAppStatusListener] = None) {
    +
    +  def executionsList(): Seq[SQLExecutionUIData] = {
    +    store.view(classOf[SQLExecutionUIData]).asScala.toSeq
    +  }
    +
    +  def execution(executionId: Long): Option[SQLExecutionUIData] = {
    +    try {
    +      Some(store.read(classOf[SQLExecutionUIData], executionId))
    +    } catch {
    +      case _: NoSuchElementException => None
    +    }
    +  }
    +
    +  def executionsCount(): Long = {
    +    store.count(classOf[SQLExecutionUIData])
    +  }
    +
    +  def executionMetrics(executionId: Long): Map[Long, String] = {
    +    def metricsFromStore(): Option[Map[Long, String]] = {
    +      val exec = store.read(classOf[SQLExecutionUIData], executionId)
    +      Option(exec.metricValues)
    +    }
    +
    +    metricsFromStore()
    +      .orElse(listener.flatMap(_.liveExecutionMetrics(executionId)))
    +      // Try a second time in case the execution finished while this method is trying to
    +      // get the metrics.
    +      .orElse(metricsFromStore())
    +      .getOrElse(Map())
    +  }
    +
    +  def planGraph(executionId: Long): SparkPlanGraph = {
    +    store.read(classOf[SparkPlanGraphWrapper], executionId).toSparkPlanGraph()
    +  }
    +
    +}
    +
    +/**
    + * An AppStatusPlugin for handling the SQL UI and listeners.
    + */
    +private[sql] class SQLAppStatusPlugin extends AppStatusPlugin {
    +
    +  override def setupListeners(
    +      conf: SparkConf,
    +      store: KVStore,
    +      addListenerFn: SparkListener => Unit,
    +      live: Boolean): Unit = {
    +    // For live applications, the listener is installed in [[setupUI]]. This also avoids adding
    +    // the listener when the UI is disabled. Force installation during testing, though.
    +    if (!live || Utils.isTesting) {
    +      val listener = new SQLAppStatusListener(conf, store, live, None)
    +      addListenerFn(listener)
    +    }
    +  }
    +
    +  override def setupUI(ui: SparkUI): Unit = {
    --- End diff --
    
    The calls are made in specific cases (setupListeners when setting up a listener bus, setupUI when setting up the UI, always). But this implementation has to be a little weird because we don't want the SQL UI is SQL hasn't been initialized, if we're to maintain the old behavior.
    
    I don't think the listener is installed twice here - here it's only installed for non-live applications (= SHS) and below it's only installed if there's a SparkContext (= live application).
    
    If we're ok to modify the existing behavior and always have the SQL tab, this can be simplified a lot.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83603/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Not that I remember.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149530162
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala ---
    @@ -0,0 +1,353 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.ui
    +
    +import java.util.Date
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.metric._
    +import org.apache.spark.sql.internal.StaticSQLConf._
    +import org.apache.spark.status.LiveEntity
    +import org.apache.spark.status.config._
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.kvstore.KVStore
    +
    +private[sql] class SQLAppStatusListener(
    +    conf: SparkConf,
    +    kvstore: KVStore,
    +    live: Boolean,
    +    ui: Option[SparkUI] = None)
    +  extends SparkListener with Logging {
    +
    +  // How often to flush intermediate stage of a live execution to the store. When replaying logs,
    +  // never flush (only do the very last write).
    +  private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
    +
    +  private val liveExecutions = new HashMap[Long, LiveExecutionData]()
    +  private val stageMetrics = new HashMap[Int, LiveStageMetrics]()
    +
    +  private var uiInitialized = false
    +
    +  override def onJobStart(event: SparkListenerJobStart): Unit = {
    +    val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
    +    if (executionIdString == null) {
    +      // This is not a job created by SQL
    +      return
    +    }
    +
    +    val executionId = executionIdString.toLong
    +    val jobId = event.jobId
    +    val exec = getOrCreateExecution(executionId)
    +
    +    // Record the accumulator IDs for the stages of this job, so that the code that keeps
    +    // track of the metrics knows which accumulators to look at.
    +    val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
    +    event.stageIds.foreach { id =>
    +      stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap()))
    +    }
    +
    +    exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
    +    exec.stages = event.stageIds
    +    update(exec)
    +  }
    +
    +  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
    +    if (!isSQLStage(event.stageInfo.stageId)) {
    +      return
    +    }
    +
    +    // Reset the metrics tracking object for the new attempt.
    +    stageMetrics.get(event.stageInfo.stageId).foreach { metrics =>
    +      metrics.taskMetrics.clear()
    +      metrics.attemptId = event.stageInfo.attemptId
    +    }
    +  }
    +
    +  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
    +    liveExecutions.values.foreach { exec =>
    +      if (exec.jobs.contains(event.jobId)) {
    +        val result = event.jobResult match {
    +          case JobSucceeded => JobExecutionStatus.SUCCEEDED
    +          case _ => JobExecutionStatus.FAILED
    +        }
    +        exec.jobs = exec.jobs + (event.jobId -> result)
    +        exec.endEvents += 1
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
    +    event.accumUpdates.foreach { case (taskId, stageId, attemptId, accumUpdates) =>
    +      updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
    +    }
    +  }
    +
    +  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
    +    if (!isSQLStage(event.stageId)) {
    +      return
    +    }
    +
    +    val info = event.taskInfo
    +    // SPARK-20342. If processing events from a live application, use the task metrics info to
    +    // work around a race in the DAGScheduler. The metrics info does not contain accumulator info
    +    // when reading event logs in the SHS, so we have to rely on the accumulator in that case.
    +    val accums = if (live && event.taskMetrics != null) {
    +      event.taskMetrics.externalAccums.flatMap { a =>
    +        // This call may fail if the accumulator is gc'ed, so account for that.
    +        try {
    +          Some(a.toInfo(Some(a.value), None))
    +        } catch {
    +          case _: IllegalAccessError => None
    +        }
    +      }
    +    } else {
    +      info.accumulables
    +    }
    +    updateStageMetrics(event.stageId, event.stageAttemptId, info.taskId, accums,
    +      info.successful)
    +  }
    +
    +  def executionMetrics(executionId: Long): Map[Long, String] = synchronized {
    +    liveExecutions.get(executionId).map { exec =>
    +      if (exec.metricsValues != null) {
    +        exec.metricsValues
    +      } else {
    +        aggregateMetrics(exec)
    +      }
    +    }.getOrElse {
    +      throw new NoSuchElementException(s"execution $executionId not found")
    +    }
    +  }
    +
    +  private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = synchronized {
    +    val metricIds = exec.metrics.map(_.accumulatorId).sorted
    +    val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap
    +    val metrics = exec.stages
    +      .flatMap(stageMetrics.get)
    +      .flatMap(_.taskMetrics.values().asScala)
    +      .flatMap { metrics => metrics.ids.zip(metrics.values) }
    +
    +    (metrics ++ exec.driverAccumUpdates.toSeq)
    +      .filter { case (id, _) => metricIds.contains(id) }
    +      .groupBy(_._1)
    +      .map { case (id, values) =>
    +        id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq)
    +      }
    +  }
    +
    +  private def updateStageMetrics(
    +      stageId: Int,
    +      attemptId: Int,
    +      taskId: Long,
    +      accumUpdates: Seq[AccumulableInfo],
    +      succeeded: Boolean): Unit = {
    +    stageMetrics.get(stageId).foreach { metrics =>
    +      if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) {
    +        return
    +      }
    +
    +      val oldTaskMetrics = metrics.taskMetrics.get(taskId)
    +      if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
    +        return
    +      }
    +
    +      val updates = accumUpdates
    +        .filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) }
    +        .sortBy(_.id)
    +
    +      if (updates.isEmpty) {
    +        return
    +      }
    +
    +      val ids = new Array[Long](updates.size)
    +      val values = new Array[Long](updates.size)
    +      updates.zipWithIndex.foreach { case (acc, idx) =>
    +        ids(idx) = acc.id
    +        // In a live application, accumulators have Long values, but when reading from event
    +        // logs, they have String values. For now, assume all accumulators are Long and covert
    +        // accordingly.
    +        values(idx) = acc.update.get match {
    +          case s: String => s.toLong
    +          case l: Long => l
    +          case o => throw new IllegalArgumentException(s"Unexpected: $o")
    +        }
    +      }
    +
    +      metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded))
    +    }
    +  }
    +
    +  private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
    +    // Install the SQL tab in a live app if it hasn't been initialized yet.
    +    if (!uiInitialized) {
    +      ui.foreach { _ui =>
    +        new SQLTab(new SQLAppStatusStore(kvstore, Some(this)), _ui)
    +      }
    +      uiInitialized = true
    +    }
    +
    +    val SparkListenerSQLExecutionStart(executionId, description, details,
    +      physicalPlanDescription, sparkPlanInfo, time) = event
    +
    +    def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper] = {
    +      nodes.map {
    +        case cluster: SparkPlanGraphCluster =>
    +          val storedCluster = new SparkPlanGraphClusterWrapper(
    +            cluster.id,
    +            cluster.name,
    +            cluster.desc,
    +            toStoredNodes(cluster.nodes),
    +            cluster.metrics)
    +          new SparkPlanGraphNodeWrapper(null, storedCluster)
    +
    +        case node =>
    +          new SparkPlanGraphNodeWrapper(node, null)
    +      }
    +    }
    +
    +    val planGraph = SparkPlanGraph(sparkPlanInfo)
    +    val sqlPlanMetrics = planGraph.allNodes.flatMap { node =>
    +      node.metrics.map { metric => (metric.accumulatorId, metric) }
    +    }.toMap.values.toList
    +
    +    val graphToStore = new SparkPlanGraphWrapper(
    +      executionId,
    +      toStoredNodes(planGraph.nodes),
    +      planGraph.edges)
    +    kvstore.write(graphToStore)
    +
    +    val exec = getOrCreateExecution(executionId)
    +    exec.description = description
    +    exec.details = details
    +    exec.physicalPlanDescription = physicalPlanDescription
    +    exec.metrics = sqlPlanMetrics
    +    exec.submissionTime = time
    +    update(exec)
    +  }
    +
    +  private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
    +    val SparkListenerSQLExecutionEnd(executionId, time) = event
    +    liveExecutions.get(executionId).foreach { exec =>
    +      synchronized {
    +        exec.metricsValues = aggregateMetrics(exec)
    +
    +        // Remove stale LiveStageMetrics objects for stages that are not active anymore.
    +        val activeStages = liveExecutions.values.flatMap { other =>
    +          if (other != exec) other.stages else Nil
    +        }.toSet
    +        stageMetrics.retain { case (id, _) => activeStages.contains(id) }
    +
    +        exec.completionTime = Some(new Date(time))
    +        exec.endEvents += 1
    +
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  private def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): Unit = {
    +    val SparkListenerDriverAccumUpdates(executionId, accumUpdates) = event
    +    liveExecutions.get(executionId).foreach { exec =>
    +      exec.driverAccumUpdates = accumUpdates.toMap
    +      update(exec)
    +    }
    +  }
    +
    +  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
    +    case e: SparkListenerSQLExecutionStart => onExecutionStart(e)
    +    case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e)
    +    case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e)
    +    case _ => // Ignore
    +  }
    +
    +  private def getOrCreateExecution(executionId: Long): LiveExecutionData = {
    +    liveExecutions.getOrElseUpdate(executionId, new LiveExecutionData(executionId))
    +  }
    +
    +  private def update(exec: LiveExecutionData): Unit = {
    +    val now = System.nanoTime()
    +    if (exec.endEvents >= exec.jobs.size + 1) {
    +      liveExecutions.remove(exec.executionId)
    +      exec.write(kvstore, now)
    +    } else if (liveUpdatePeriodNs >= 0) {
    +      if (now - exec.lastWriteTime > liveUpdatePeriodNs) {
    +        exec.write(kvstore, now)
    +      }
    +    }
    +  }
    +
    +  private def isSQLStage(stageId: Int): Boolean = {
    +    liveExecutions.values.exists { exec =>
    +      exec.stages.contains(stageId)
    --- End diff --
    
    any reason not to make `exec.stages` a `Set`?  I guess it would normally be small so it doesn't matter, but seems like it couldn't hurt to protect against some strange SQL query with some huge number of stages.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83855/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    **[Test build #83802 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83802/testReport)** for PR 19681 at commit [`56761af`](https://github.com/apache/spark/commit/56761af51d26df8d60989ace59a6b757ed8c38b5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r157015987
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
    @@ -455,9 +457,14 @@ class SparkContext(config: SparkConf) extends Logging {
             // For tests, do not enable the UI
             None
           }
    -    // Bind the UI before starting the task scheduler to communicate
    -    // the bound port to the cluster manager properly
    -    _ui.foreach(_.bind())
    +    _ui.foreach { ui =>
    +      // Load any plugins that might want to modify the UI.
    +      AppStatusPlugin.loadPlugins().foreach(_.setupUI(ui))
    --- End diff --
    
    @vanzin the live UI doesn't need a 2-step process to set up the UI, while history server needs. That's why I think they should not share one plugin interface.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149529181
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala ---
    @@ -0,0 +1,353 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.ui
    +
    +import java.util.Date
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.metric._
    +import org.apache.spark.sql.internal.StaticSQLConf._
    +import org.apache.spark.status.LiveEntity
    +import org.apache.spark.status.config._
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.kvstore.KVStore
    +
    +private[sql] class SQLAppStatusListener(
    +    conf: SparkConf,
    +    kvstore: KVStore,
    +    live: Boolean,
    +    ui: Option[SparkUI] = None)
    +  extends SparkListener with Logging {
    +
    +  // How often to flush intermediate stage of a live execution to the store. When replaying logs,
    +  // never flush (only do the very last write).
    +  private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
    +
    +  private val liveExecutions = new HashMap[Long, LiveExecutionData]()
    +  private val stageMetrics = new HashMap[Int, LiveStageMetrics]()
    +
    +  private var uiInitialized = false
    +
    +  override def onJobStart(event: SparkListenerJobStart): Unit = {
    +    val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
    +    if (executionIdString == null) {
    +      // This is not a job created by SQL
    +      return
    +    }
    +
    +    val executionId = executionIdString.toLong
    +    val jobId = event.jobId
    +    val exec = getOrCreateExecution(executionId)
    +
    +    // Record the accumulator IDs for the stages of this job, so that the code that keeps
    +    // track of the metrics knows which accumulators to look at.
    +    val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
    +    event.stageIds.foreach { id =>
    +      stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap()))
    +    }
    +
    +    exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
    +    exec.stages = event.stageIds
    +    update(exec)
    +  }
    +
    +  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
    +    if (!isSQLStage(event.stageInfo.stageId)) {
    +      return
    +    }
    +
    +    // Reset the metrics tracking object for the new attempt.
    +    stageMetrics.get(event.stageInfo.stageId).foreach { metrics =>
    +      metrics.taskMetrics.clear()
    +      metrics.attemptId = event.stageInfo.attemptId
    +    }
    +  }
    +
    +  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
    +    liveExecutions.values.foreach { exec =>
    +      if (exec.jobs.contains(event.jobId)) {
    +        val result = event.jobResult match {
    +          case JobSucceeded => JobExecutionStatus.SUCCEEDED
    +          case _ => JobExecutionStatus.FAILED
    +        }
    +        exec.jobs = exec.jobs + (event.jobId -> result)
    +        exec.endEvents += 1
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
    +    event.accumUpdates.foreach { case (taskId, stageId, attemptId, accumUpdates) =>
    +      updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
    +    }
    +  }
    +
    +  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
    +    if (!isSQLStage(event.stageId)) {
    +      return
    +    }
    +
    +    val info = event.taskInfo
    +    // SPARK-20342. If processing events from a live application, use the task metrics info to
    +    // work around a race in the DAGScheduler. The metrics info does not contain accumulator info
    +    // when reading event logs in the SHS, so we have to rely on the accumulator in that case.
    +    val accums = if (live && event.taskMetrics != null) {
    +      event.taskMetrics.externalAccums.flatMap { a =>
    +        // This call may fail if the accumulator is gc'ed, so account for that.
    +        try {
    +          Some(a.toInfo(Some(a.value), None))
    +        } catch {
    +          case _: IllegalAccessError => None
    +        }
    +      }
    +    } else {
    +      info.accumulables
    +    }
    +    updateStageMetrics(event.stageId, event.stageAttemptId, info.taskId, accums,
    +      info.successful)
    +  }
    +
    +  def executionMetrics(executionId: Long): Map[Long, String] = synchronized {
    +    liveExecutions.get(executionId).map { exec =>
    +      if (exec.metricsValues != null) {
    +        exec.metricsValues
    +      } else {
    +        aggregateMetrics(exec)
    +      }
    +    }.getOrElse {
    +      throw new NoSuchElementException(s"execution $executionId not found")
    +    }
    +  }
    +
    +  private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = synchronized {
    +    val metricIds = exec.metrics.map(_.accumulatorId).sorted
    +    val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap
    +    val metrics = exec.stages
    +      .flatMap(stageMetrics.get)
    +      .flatMap(_.taskMetrics.values().asScala)
    +      .flatMap { metrics => metrics.ids.zip(metrics.values) }
    +
    +    (metrics ++ exec.driverAccumUpdates.toSeq)
    +      .filter { case (id, _) => metricIds.contains(id) }
    +      .groupBy(_._1)
    +      .map { case (id, values) =>
    +        id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq)
    +      }
    +  }
    +
    +  private def updateStageMetrics(
    +      stageId: Int,
    +      attemptId: Int,
    +      taskId: Long,
    +      accumUpdates: Seq[AccumulableInfo],
    +      succeeded: Boolean): Unit = {
    +    stageMetrics.get(stageId).foreach { metrics =>
    +      if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) {
    +        return
    +      }
    +
    +      val oldTaskMetrics = metrics.taskMetrics.get(taskId)
    +      if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
    +        return
    +      }
    +
    +      val updates = accumUpdates
    +        .filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) }
    +        .sortBy(_.id)
    +
    +      if (updates.isEmpty) {
    +        return
    +      }
    +
    +      val ids = new Array[Long](updates.size)
    +      val values = new Array[Long](updates.size)
    +      updates.zipWithIndex.foreach { case (acc, idx) =>
    +        ids(idx) = acc.id
    +        // In a live application, accumulators have Long values, but when reading from event
    +        // logs, they have String values. For now, assume all accumulators are Long and covert
    +        // accordingly.
    +        values(idx) = acc.update.get match {
    +          case s: String => s.toLong
    +          case l: Long => l
    +          case o => throw new IllegalArgumentException(s"Unexpected: $o")
    +        }
    +      }
    +
    +      metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded))
    +    }
    +  }
    +
    +  private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
    +    // Install the SQL tab in a live app if it hasn't been initialized yet.
    +    if (!uiInitialized) {
    +      ui.foreach { _ui =>
    +        new SQLTab(new SQLAppStatusStore(kvstore, Some(this)), _ui)
    +      }
    +      uiInitialized = true
    +    }
    +
    +    val SparkListenerSQLExecutionStart(executionId, description, details,
    +      physicalPlanDescription, sparkPlanInfo, time) = event
    +
    +    def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper] = {
    +      nodes.map {
    +        case cluster: SparkPlanGraphCluster =>
    +          val storedCluster = new SparkPlanGraphClusterWrapper(
    +            cluster.id,
    +            cluster.name,
    +            cluster.desc,
    +            toStoredNodes(cluster.nodes),
    +            cluster.metrics)
    +          new SparkPlanGraphNodeWrapper(null, storedCluster)
    +
    +        case node =>
    +          new SparkPlanGraphNodeWrapper(node, null)
    +      }
    +    }
    +
    +    val planGraph = SparkPlanGraph(sparkPlanInfo)
    +    val sqlPlanMetrics = planGraph.allNodes.flatMap { node =>
    +      node.metrics.map { metric => (metric.accumulatorId, metric) }
    +    }.toMap.values.toList
    +
    +    val graphToStore = new SparkPlanGraphWrapper(
    +      executionId,
    +      toStoredNodes(planGraph.nodes),
    +      planGraph.edges)
    +    kvstore.write(graphToStore)
    +
    +    val exec = getOrCreateExecution(executionId)
    +    exec.description = description
    +    exec.details = details
    +    exec.physicalPlanDescription = physicalPlanDescription
    +    exec.metrics = sqlPlanMetrics
    +    exec.submissionTime = time
    +    update(exec)
    +  }
    +
    +  private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
    +    val SparkListenerSQLExecutionEnd(executionId, time) = event
    +    liveExecutions.get(executionId).foreach { exec =>
    +      synchronized {
    +        exec.metricsValues = aggregateMetrics(exec)
    +
    +        // Remove stale LiveStageMetrics objects for stages that are not active anymore.
    +        val activeStages = liveExecutions.values.flatMap { other =>
    +          if (other != exec) other.stages else Nil
    +        }.toSet
    +        stageMetrics.retain { case (id, _) => activeStages.contains(id) }
    +
    +        exec.completionTime = Some(new Date(time))
    +        exec.endEvents += 1
    +
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  private def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): Unit = {
    +    val SparkListenerDriverAccumUpdates(executionId, accumUpdates) = event
    +    liveExecutions.get(executionId).foreach { exec =>
    +      exec.driverAccumUpdates = accumUpdates.toMap
    +      update(exec)
    +    }
    +  }
    +
    +  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
    +    case e: SparkListenerSQLExecutionStart => onExecutionStart(e)
    +    case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e)
    +    case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e)
    +    case _ => // Ignore
    +  }
    +
    +  private def getOrCreateExecution(executionId: Long): LiveExecutionData = {
    +    liveExecutions.getOrElseUpdate(executionId, new LiveExecutionData(executionId))
    +  }
    +
    +  private def update(exec: LiveExecutionData): Unit = {
    +    val now = System.nanoTime()
    +    if (exec.endEvents >= exec.jobs.size + 1) {
    +      liveExecutions.remove(exec.executionId)
    +      exec.write(kvstore, now)
    --- End diff --
    
    if a job was killed mid execution, so the event log didn't contain the end events, you wouldn't get to see any info for that execution, would you?  don't you need some final flush?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149791151
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -420,21 +428,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
         withTempPath { file =>
           // person creates a temporary view. get the DF before listing previous execution IDs
           val data = person.select('name)
    -      sparkContext.listenerBus.waitUntilEmpty(10000)
    --- End diff --
    
    `person.select()` doesn't generate any events.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    **[Test build #83855 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83855/testReport)** for PR 19681 at commit [`c068a5b`](https://github.com/apache/spark/commit/c068a5bb88e73532018d8f7a8dbbf96ffb6a93d8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    **[Test build #83521 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83521/testReport)** for PR 19681 at commit [`ccd5adc`](https://github.com/apache/spark/commit/ccd5adc1d6273b92fd6c9a0d4817451a5acb566a).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class SQLTab(val sqlStore: SQLAppStatusStore, sparkUI: SparkUI)`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83563/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83567/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83620/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r157129757
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala ---
    @@ -118,309 +142,286 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
           (id, accumulatorValue)
         }.toMap
     
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
     
    -    val executionUIData = listener.executionIdToData(0)
    -
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq(
             createStageInfo(0, 0),
             createStageInfo(1, 0)
           ),
           createProperties(executionId)))
    -    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
    +    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
     
    -    assert(listener.getExecutionMetrics(0).isEmpty)
    +    assert(store.executionMetrics(0).isEmpty)
     
    -    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
    +      (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
     
         // Driver accumulator updates don't belong to this execution should be filtered and no
         // exception will be thrown.
    -    listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
     
    -    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 0, 0,
    -        createTaskMetrics(accumulatorUpdates.mapValues(_ * 2)).accumulators().map(makeInfo))
    +      (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2)))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 3))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3))
     
         // Retrying a stage should reset the metrics
    -    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
    +    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
     
    -    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
    +      (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
     
         // Ignore the task end for the first attempt
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
    -      createTaskInfo(0, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 100))))
    +      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
    +      null))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
     
         // Finish two tasks
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
    -      createTaskInfo(0, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 2))))
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)),
    +      null))
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
    -      createTaskInfo(1, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
    +      createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
    +      null))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 5))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 5))
     
         // Summit a new stage
    -    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
    +    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0)))
     
    -    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 1, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 1, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
    +      (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 7))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 7))
     
         // Finish two tasks
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
    -      createTaskInfo(0, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
    +      null))
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
    -      createTaskInfo(1, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
    +      createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
    +      null))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
     
    -    assert(executionUIData.runningJobs === Seq(0))
    -    assert(executionUIData.succeededJobs.isEmpty)
    -    assert(executionUIData.failedJobs.isEmpty)
    +    assertJobs(store.execution(0), running = Seq(0))
     
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
     
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs === Seq(0))
    -    assert(executionUIData.failedJobs.isEmpty)
    -
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    assertJobs(store.execution(0), completed = Seq(0))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
       }
     
    -  test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    +  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") { (store, bus) =>
         val executionId = 0
         val df = createTestDataFrame
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    val executionUIData = listener.executionIdToData(0)
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs === Seq(0))
    -    assert(executionUIData.failedJobs.isEmpty)
    +    assertJobs(store.execution(0), completed = Seq(0))
       }
     
    -  test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    +  sqlStoreTest("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") { (store, bus) =>
         val executionId = 0
         val df = createTestDataFrame
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
             jobId = 0,
             time = System.currentTimeMillis(),
             JobSucceeded
         ))
     
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 1,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 1,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    val executionUIData = listener.executionIdToData(0)
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs.sorted === Seq(0, 1))
    -    assert(executionUIData.failedJobs.isEmpty)
    +    assertJobs(store.execution(0), completed = Seq(0, 1))
       }
     
    -  test("onExecutionEnd happens before onJobEnd(JobFailed)") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    +  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") { (store, bus) =>
         val executionId = 0
         val df = createTestDataFrame
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq.empty,
           createProperties(executionId)))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobFailed(new RuntimeException("Oops"))
         ))
     
    -    val executionUIData = listener.executionIdToData(0)
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs.isEmpty)
    -    assert(executionUIData.failedJobs === Seq(0))
    +    assertJobs(store.execution(0), failed = Seq(0))
       }
     
       test("SPARK-11126: no memory leak when running non SQL jobs") {
    -    val previousStageNumber = spark.sharedState.listener.stageIdToStageMetrics.size
    +    val previousStageNumber = statusStore.executionsList().size
         spark.sparkContext.parallelize(1 to 10).foreach(i => ())
         spark.sparkContext.listenerBus.waitUntilEmpty(10000)
         // listener should ignore the non SQL stage
    -    assert(spark.sharedState.listener.stageIdToStageMetrics.size == previousStageNumber)
    +    assert(statusStore.executionsList().size == previousStageNumber)
     
         spark.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
         spark.sparkContext.listenerBus.waitUntilEmpty(10000)
         // listener should save the SQL stage
    -    assert(spark.sharedState.listener.stageIdToStageMetrics.size == previousStageNumber + 1)
    -  }
    -
    -  test("SPARK-13055: history listener only tracks SQL metrics") {
    -    val listener = new SQLHistoryListener(sparkContext.conf, mock(classOf[SparkUI]))
    -    // We need to post other events for the listener to track our accumulators.
    -    // These are largely just boilerplate unrelated to what we're trying to test.
    -    val df = createTestDataFrame
    -    val executionStart = SparkListenerSQLExecutionStart(
    -      0, "", "", "", SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), 0)
    -    val stageInfo = createStageInfo(0, 0)
    -    val jobStart = SparkListenerJobStart(0, 0, Seq(stageInfo), createProperties(0))
    -    val stageSubmitted = SparkListenerStageSubmitted(stageInfo)
    -    // This task has both accumulators that are SQL metrics and accumulators that are not.
    -    // The listener should only track the ones that are actually SQL metrics.
    -    val sqlMetric = SQLMetrics.createMetric(sparkContext, "beach umbrella")
    -    val nonSqlMetric = sparkContext.longAccumulator("baseball")
    -    val sqlMetricInfo = sqlMetric.toInfo(Some(sqlMetric.value), None)
    -    val nonSqlMetricInfo = nonSqlMetric.toInfo(Some(nonSqlMetric.value), None)
    -    val taskInfo = createTaskInfo(0, 0)
    -    taskInfo.setAccumulables(List(sqlMetricInfo, nonSqlMetricInfo))
    -    val taskEnd = SparkListenerTaskEnd(0, 0, "just-a-task", null, taskInfo, null)
    -    listener.onOtherEvent(executionStart)
    -    listener.onJobStart(jobStart)
    -    listener.onStageSubmitted(stageSubmitted)
    -    // Before SPARK-13055, this throws ClassCastException because the history listener would
    -    // assume that the accumulator value is of type Long, but this may not be true for
    -    // accumulators that are not SQL metrics.
    -    listener.onTaskEnd(taskEnd)
    -    val trackedAccums = listener.stageIdToStageMetrics.values.flatMap { stageMetrics =>
    -      stageMetrics.taskIdToMetricUpdates.values.flatMap(_.accumulatorUpdates)
    -    }
    -    // Listener tracks only SQL metrics, not other accumulators
    -    assert(trackedAccums.size === 1)
    -    assert(trackedAccums.head === ((sqlMetricInfo.id, sqlMetricInfo.update.get)))
    +    assert(statusStore.executionsList().size == previousStageNumber + 1)
       }
     
       test("driver side SQL metrics") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    -    val expectedAccumValue = 12345
    +    val oldCount = statusStore.executionsList().size
    +    val expectedAccumValue = 12345L
         val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue)
    -    sqlContext.sparkContext.addSparkListener(listener)
         val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) {
           override lazy val sparkPlan = physicalPlan
           override lazy val executedPlan = physicalPlan
         }
    +
         SQLExecution.withNewExecutionId(spark, dummyQueryExecution) {
           physicalPlan.execute().collect()
         }
     
    -    def waitTillExecutionFinished(): Unit = {
    -      while (listener.getCompletedExecutions.isEmpty) {
    -        Thread.sleep(100)
    +    while (statusStore.executionsList().size < oldCount) {
    --- End diff --
    
    This doesn't mean execution ends now, but execution starts.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83658/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r157279430
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala ---
    @@ -118,309 +142,286 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
           (id, accumulatorValue)
         }.toMap
     
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
     
    -    val executionUIData = listener.executionIdToData(0)
    -
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq(
             createStageInfo(0, 0),
             createStageInfo(1, 0)
           ),
           createProperties(executionId)))
    -    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
    +    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
     
    -    assert(listener.getExecutionMetrics(0).isEmpty)
    +    assert(store.executionMetrics(0).isEmpty)
     
    -    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
    +      (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
     
         // Driver accumulator updates don't belong to this execution should be filtered and no
         // exception will be thrown.
    -    listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
     
    -    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 0, 0,
    -        createTaskMetrics(accumulatorUpdates.mapValues(_ * 2)).accumulators().map(makeInfo))
    +      (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2)))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 3))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3))
     
         // Retrying a stage should reset the metrics
    -    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
    +    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
     
    -    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
    +      (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
     
         // Ignore the task end for the first attempt
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
    -      createTaskInfo(0, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 100))))
    +      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
    +      null))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
     
         // Finish two tasks
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
    -      createTaskInfo(0, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 2))))
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)),
    +      null))
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
    -      createTaskInfo(1, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
    +      createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
    +      null))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 5))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 5))
     
         // Summit a new stage
    -    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
    +    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0)))
     
    -    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 1, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 1, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
    +      (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 7))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 7))
     
         // Finish two tasks
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
    -      createTaskInfo(0, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
    +      null))
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
    -      createTaskInfo(1, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
    +      createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
    +      null))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
     
    -    assert(executionUIData.runningJobs === Seq(0))
    -    assert(executionUIData.succeededJobs.isEmpty)
    -    assert(executionUIData.failedJobs.isEmpty)
    +    assertJobs(store.execution(0), running = Seq(0))
     
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
     
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs === Seq(0))
    -    assert(executionUIData.failedJobs.isEmpty)
    -
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    assertJobs(store.execution(0), completed = Seq(0))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
       }
     
    -  test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    +  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") { (store, bus) =>
         val executionId = 0
         val df = createTestDataFrame
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    val executionUIData = listener.executionIdToData(0)
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs === Seq(0))
    -    assert(executionUIData.failedJobs.isEmpty)
    +    assertJobs(store.execution(0), completed = Seq(0))
       }
     
    -  test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    +  sqlStoreTest("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") { (store, bus) =>
         val executionId = 0
         val df = createTestDataFrame
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
             jobId = 0,
             time = System.currentTimeMillis(),
             JobSucceeded
         ))
     
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 1,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 1,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    val executionUIData = listener.executionIdToData(0)
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs.sorted === Seq(0, 1))
    -    assert(executionUIData.failedJobs.isEmpty)
    +    assertJobs(store.execution(0), completed = Seq(0, 1))
       }
     
    -  test("onExecutionEnd happens before onJobEnd(JobFailed)") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    +  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") { (store, bus) =>
         val executionId = 0
         val df = createTestDataFrame
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq.empty,
           createProperties(executionId)))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobFailed(new RuntimeException("Oops"))
         ))
     
    -    val executionUIData = listener.executionIdToData(0)
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs.isEmpty)
    -    assert(executionUIData.failedJobs === Seq(0))
    +    assertJobs(store.execution(0), failed = Seq(0))
       }
     
       test("SPARK-11126: no memory leak when running non SQL jobs") {
    -    val previousStageNumber = spark.sharedState.listener.stageIdToStageMetrics.size
    +    val previousStageNumber = statusStore.executionsList().size
         spark.sparkContext.parallelize(1 to 10).foreach(i => ())
         spark.sparkContext.listenerBus.waitUntilEmpty(10000)
         // listener should ignore the non SQL stage
    -    assert(spark.sharedState.listener.stageIdToStageMetrics.size == previousStageNumber)
    +    assert(statusStore.executionsList().size == previousStageNumber)
     
         spark.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
         spark.sparkContext.listenerBus.waitUntilEmpty(10000)
         // listener should save the SQL stage
    -    assert(spark.sharedState.listener.stageIdToStageMetrics.size == previousStageNumber + 1)
    -  }
    -
    -  test("SPARK-13055: history listener only tracks SQL metrics") {
    -    val listener = new SQLHistoryListener(sparkContext.conf, mock(classOf[SparkUI]))
    -    // We need to post other events for the listener to track our accumulators.
    -    // These are largely just boilerplate unrelated to what we're trying to test.
    -    val df = createTestDataFrame
    -    val executionStart = SparkListenerSQLExecutionStart(
    -      0, "", "", "", SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), 0)
    -    val stageInfo = createStageInfo(0, 0)
    -    val jobStart = SparkListenerJobStart(0, 0, Seq(stageInfo), createProperties(0))
    -    val stageSubmitted = SparkListenerStageSubmitted(stageInfo)
    -    // This task has both accumulators that are SQL metrics and accumulators that are not.
    -    // The listener should only track the ones that are actually SQL metrics.
    -    val sqlMetric = SQLMetrics.createMetric(sparkContext, "beach umbrella")
    -    val nonSqlMetric = sparkContext.longAccumulator("baseball")
    -    val sqlMetricInfo = sqlMetric.toInfo(Some(sqlMetric.value), None)
    -    val nonSqlMetricInfo = nonSqlMetric.toInfo(Some(nonSqlMetric.value), None)
    -    val taskInfo = createTaskInfo(0, 0)
    -    taskInfo.setAccumulables(List(sqlMetricInfo, nonSqlMetricInfo))
    -    val taskEnd = SparkListenerTaskEnd(0, 0, "just-a-task", null, taskInfo, null)
    -    listener.onOtherEvent(executionStart)
    -    listener.onJobStart(jobStart)
    -    listener.onStageSubmitted(stageSubmitted)
    -    // Before SPARK-13055, this throws ClassCastException because the history listener would
    -    // assume that the accumulator value is of type Long, but this may not be true for
    -    // accumulators that are not SQL metrics.
    -    listener.onTaskEnd(taskEnd)
    -    val trackedAccums = listener.stageIdToStageMetrics.values.flatMap { stageMetrics =>
    -      stageMetrics.taskIdToMetricUpdates.values.flatMap(_.accumulatorUpdates)
    -    }
    -    // Listener tracks only SQL metrics, not other accumulators
    -    assert(trackedAccums.size === 1)
    -    assert(trackedAccums.head === ((sqlMetricInfo.id, sqlMetricInfo.update.get)))
    +    assert(statusStore.executionsList().size == previousStageNumber + 1)
       }
     
       test("driver side SQL metrics") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    -    val expectedAccumValue = 12345
    +    val oldCount = statusStore.executionsList().size
    +    val expectedAccumValue = 12345L
         val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue)
    -    sqlContext.sparkContext.addSparkListener(listener)
    --- End diff --
    
    Yes, it works. There are two kinds of tests in this suite now:
    
    - "test(blah)" like this one which uses the active spark session's listener
    - "sqlStoreTest" which manually drive a replay bus and verify expected changes in the store.
    
    This particular test (and the other one you commented on) are the first kind. Tests of the other kind do not run actual jobs, they just inject events into the replay bus manually.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149457439
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala ---
    @@ -0,0 +1,353 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.ui
    +
    +import java.util.Date
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.metric._
    +import org.apache.spark.sql.internal.StaticSQLConf._
    +import org.apache.spark.status.LiveEntity
    +import org.apache.spark.status.config._
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.kvstore.KVStore
    +
    +private[sql] class SQLAppStatusListener(
    --- End diff --
    
    I don't see calls to `update` in `onTaskEnd` or `onExecutorMetricsUpdate`.  Does that mean the live UI wont' update till a stage is finished?  But after looking at the tests, I guess I'm wrong, it does update ... where is the update I'm missing?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83525/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    **[Test build #83802 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83802/testReport)** for PR 19681 at commit [`56761af`](https://github.com/apache/spark/commit/56761af51d26df8d60989ace59a6b757ed8c38b5).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    **[Test build #83603 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83603/testReport)** for PR 19681 at commit [`ecf293b`](https://github.com/apache/spark/commit/ecf293b31fa1b5250f484d6b2f09373e7057bbc3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r157279789
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala ---
    @@ -118,309 +142,286 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
           (id, accumulatorValue)
         }.toMap
     
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
     
    -    val executionUIData = listener.executionIdToData(0)
    -
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq(
             createStageInfo(0, 0),
             createStageInfo(1, 0)
           ),
           createProperties(executionId)))
    -    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
    +    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
     
    -    assert(listener.getExecutionMetrics(0).isEmpty)
    +    assert(store.executionMetrics(0).isEmpty)
     
    -    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
    +      (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
     
         // Driver accumulator updates don't belong to this execution should be filtered and no
         // exception will be thrown.
    -    listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
     
    -    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 0, 0,
    -        createTaskMetrics(accumulatorUpdates.mapValues(_ * 2)).accumulators().map(makeInfo))
    +      (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2)))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 3))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3))
     
         // Retrying a stage should reset the metrics
    -    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
    +    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
     
    -    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
    +      (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
     
         // Ignore the task end for the first attempt
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
    -      createTaskInfo(0, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 100))))
    +      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
    +      null))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
     
         // Finish two tasks
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
    -      createTaskInfo(0, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 2))))
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)),
    +      null))
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
    -      createTaskInfo(1, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
    +      createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
    +      null))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 5))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 5))
     
         // Summit a new stage
    -    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
    +    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0)))
     
    -    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 1, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 1, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
    +      (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 7))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 7))
     
         // Finish two tasks
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
    -      createTaskInfo(0, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
    +      null))
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
    -      createTaskInfo(1, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
    +      createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
    +      null))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
     
    -    assert(executionUIData.runningJobs === Seq(0))
    -    assert(executionUIData.succeededJobs.isEmpty)
    -    assert(executionUIData.failedJobs.isEmpty)
    +    assertJobs(store.execution(0), running = Seq(0))
     
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
     
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs === Seq(0))
    -    assert(executionUIData.failedJobs.isEmpty)
    -
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    assertJobs(store.execution(0), completed = Seq(0))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
       }
     
    -  test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    +  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") { (store, bus) =>
         val executionId = 0
         val df = createTestDataFrame
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    val executionUIData = listener.executionIdToData(0)
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs === Seq(0))
    -    assert(executionUIData.failedJobs.isEmpty)
    +    assertJobs(store.execution(0), completed = Seq(0))
       }
     
    -  test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    +  sqlStoreTest("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") { (store, bus) =>
         val executionId = 0
         val df = createTestDataFrame
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
             jobId = 0,
             time = System.currentTimeMillis(),
             JobSucceeded
         ))
     
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 1,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 1,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    val executionUIData = listener.executionIdToData(0)
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs.sorted === Seq(0, 1))
    -    assert(executionUIData.failedJobs.isEmpty)
    +    assertJobs(store.execution(0), completed = Seq(0, 1))
       }
     
    -  test("onExecutionEnd happens before onJobEnd(JobFailed)") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    +  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") { (store, bus) =>
         val executionId = 0
         val df = createTestDataFrame
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq.empty,
           createProperties(executionId)))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobFailed(new RuntimeException("Oops"))
         ))
     
    -    val executionUIData = listener.executionIdToData(0)
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs.isEmpty)
    -    assert(executionUIData.failedJobs === Seq(0))
    +    assertJobs(store.execution(0), failed = Seq(0))
       }
     
       test("SPARK-11126: no memory leak when running non SQL jobs") {
    -    val previousStageNumber = spark.sharedState.listener.stageIdToStageMetrics.size
    +    val previousStageNumber = statusStore.executionsList().size
         spark.sparkContext.parallelize(1 to 10).foreach(i => ())
         spark.sparkContext.listenerBus.waitUntilEmpty(10000)
         // listener should ignore the non SQL stage
    -    assert(spark.sharedState.listener.stageIdToStageMetrics.size == previousStageNumber)
    +    assert(statusStore.executionsList().size == previousStageNumber)
     
         spark.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
         spark.sparkContext.listenerBus.waitUntilEmpty(10000)
         // listener should save the SQL stage
    -    assert(spark.sharedState.listener.stageIdToStageMetrics.size == previousStageNumber + 1)
    -  }
    -
    -  test("SPARK-13055: history listener only tracks SQL metrics") {
    -    val listener = new SQLHistoryListener(sparkContext.conf, mock(classOf[SparkUI]))
    -    // We need to post other events for the listener to track our accumulators.
    -    // These are largely just boilerplate unrelated to what we're trying to test.
    -    val df = createTestDataFrame
    -    val executionStart = SparkListenerSQLExecutionStart(
    -      0, "", "", "", SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), 0)
    -    val stageInfo = createStageInfo(0, 0)
    -    val jobStart = SparkListenerJobStart(0, 0, Seq(stageInfo), createProperties(0))
    -    val stageSubmitted = SparkListenerStageSubmitted(stageInfo)
    -    // This task has both accumulators that are SQL metrics and accumulators that are not.
    -    // The listener should only track the ones that are actually SQL metrics.
    -    val sqlMetric = SQLMetrics.createMetric(sparkContext, "beach umbrella")
    -    val nonSqlMetric = sparkContext.longAccumulator("baseball")
    -    val sqlMetricInfo = sqlMetric.toInfo(Some(sqlMetric.value), None)
    -    val nonSqlMetricInfo = nonSqlMetric.toInfo(Some(nonSqlMetric.value), None)
    -    val taskInfo = createTaskInfo(0, 0)
    -    taskInfo.setAccumulables(List(sqlMetricInfo, nonSqlMetricInfo))
    -    val taskEnd = SparkListenerTaskEnd(0, 0, "just-a-task", null, taskInfo, null)
    -    listener.onOtherEvent(executionStart)
    -    listener.onJobStart(jobStart)
    -    listener.onStageSubmitted(stageSubmitted)
    -    // Before SPARK-13055, this throws ClassCastException because the history listener would
    -    // assume that the accumulator value is of type Long, but this may not be true for
    -    // accumulators that are not SQL metrics.
    -    listener.onTaskEnd(taskEnd)
    -    val trackedAccums = listener.stageIdToStageMetrics.values.flatMap { stageMetrics =>
    -      stageMetrics.taskIdToMetricUpdates.values.flatMap(_.accumulatorUpdates)
    -    }
    -    // Listener tracks only SQL metrics, not other accumulators
    -    assert(trackedAccums.size === 1)
    -    assert(trackedAccums.head === ((sqlMetricInfo.id, sqlMetricInfo.update.get)))
    +    assert(statusStore.executionsList().size == previousStageNumber + 1)
       }
     
       test("driver side SQL metrics") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    -    val expectedAccumValue = 12345
    +    val oldCount = statusStore.executionsList().size
    +    val expectedAccumValue = 12345L
         val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue)
    -    sqlContext.sparkContext.addSparkListener(listener)
         val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) {
           override lazy val sparkPlan = physicalPlan
           override lazy val executedPlan = physicalPlan
         }
    +
         SQLExecution.withNewExecutionId(spark, dummyQueryExecution) {
           physicalPlan.execute().collect()
         }
     
    -    def waitTillExecutionFinished(): Unit = {
    -      while (listener.getCompletedExecutions.isEmpty) {
    -        Thread.sleep(100)
    +    while (statusStore.executionsList().size < oldCount) {
    --- End diff --
    
    True, but the lines below:
    
    ```
        // Wait for listener to finish computing the metrics for the execution.
        while (statusStore.executionsList().last.metricValues == null) {
          Thread.sleep(100)
        }
    ```
    
    Basically wait for that new execution to end (which is when the coalesced `metricValues` field is populated).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149767509
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala ---
    @@ -0,0 +1,353 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.ui
    +
    +import java.util.Date
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.metric._
    +import org.apache.spark.sql.internal.StaticSQLConf._
    +import org.apache.spark.status.LiveEntity
    +import org.apache.spark.status.config._
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.kvstore.KVStore
    +
    +private[sql] class SQLAppStatusListener(
    +    conf: SparkConf,
    +    kvstore: KVStore,
    +    live: Boolean,
    +    ui: Option[SparkUI] = None)
    +  extends SparkListener with Logging {
    +
    +  // How often to flush intermediate state of a live execution to the store. When replaying logs,
    +  // never flush (only do the very last write).
    +  private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
    +
    +  private val liveExecutions = new HashMap[Long, LiveExecutionData]()
    +  private val stageMetrics = new HashMap[Int, LiveStageMetrics]()
    +
    +  private var uiInitialized = false
    +
    +  override def onJobStart(event: SparkListenerJobStart): Unit = {
    +    val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
    +    if (executionIdString == null) {
    +      // This is not a job created by SQL
    +      return
    +    }
    +
    +    val executionId = executionIdString.toLong
    +    val jobId = event.jobId
    +    val exec = getOrCreateExecution(executionId)
    +
    +    // Record the accumulator IDs for the stages of this job, so that the code that keeps
    +    // track of the metrics knows which accumulators to look at.
    +    val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
    +    event.stageIds.foreach { id =>
    +      stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap()))
    +    }
    +
    +    exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
    +    exec.stages = event.stageIds.toSet
    +    update(exec)
    +  }
    +
    +  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
    +    if (!isSQLStage(event.stageInfo.stageId)) {
    +      return
    +    }
    +
    +    // Reset the metrics tracking object for the new attempt.
    +    stageMetrics.get(event.stageInfo.stageId).foreach { metrics =>
    +      metrics.taskMetrics.clear()
    +      metrics.attemptId = event.stageInfo.attemptId
    +    }
    +  }
    +
    +  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
    +    liveExecutions.values.foreach { exec =>
    +      if (exec.jobs.contains(event.jobId)) {
    +        val result = event.jobResult match {
    +          case JobSucceeded => JobExecutionStatus.SUCCEEDED
    +          case _ => JobExecutionStatus.FAILED
    +        }
    +        exec.jobs = exec.jobs + (event.jobId -> result)
    +        exec.endEvents += 1
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
    +    event.accumUpdates.foreach { case (taskId, stageId, attemptId, accumUpdates) =>
    +      updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
    +    }
    +  }
    +
    +  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
    +    if (!isSQLStage(event.stageId)) {
    +      return
    +    }
    +
    +    val info = event.taskInfo
    +    // SPARK-20342. If processing events from a live application, use the task metrics info to
    +    // work around a race in the DAGScheduler. The metrics info does not contain accumulator info
    +    // when reading event logs in the SHS, so we have to rely on the accumulator in that case.
    +    val accums = if (live && event.taskMetrics != null) {
    +      event.taskMetrics.externalAccums.flatMap { a =>
    +        // This call may fail if the accumulator is gc'ed, so account for that.
    +        try {
    +          Some(a.toInfo(Some(a.value), None))
    +        } catch {
    +          case _: IllegalAccessError => None
    +        }
    +      }
    +    } else {
    +      info.accumulables
    +    }
    +    updateStageMetrics(event.stageId, event.stageAttemptId, info.taskId, accums,
    +      info.successful)
    +  }
    +
    +  def executionMetrics(executionId: Long): Map[Long, String] = synchronized {
    +    liveExecutions.get(executionId).map { exec =>
    +      if (exec.metricsValues != null) {
    +        exec.metricsValues
    +      } else {
    +        aggregateMetrics(exec)
    +      }
    +    }.getOrElse {
    +      throw new NoSuchElementException(s"execution $executionId not found")
    +    }
    +  }
    +
    +  private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = synchronized {
    +    val metricIds = exec.metrics.map(_.accumulatorId).sorted
    +    val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap
    +    val metrics = exec.stages.toSeq
    +      .flatMap(stageMetrics.get)
    +      .flatMap(_.taskMetrics.values().asScala)
    +      .flatMap { metrics => metrics.ids.zip(metrics.values) }
    +
    +    (metrics ++ exec.driverAccumUpdates.toSeq)
    +      .filter { case (id, _) => metricIds.contains(id) }
    +      .groupBy(_._1)
    +      .map { case (id, values) =>
    +        id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq)
    +      }
    +  }
    +
    +  private def updateStageMetrics(
    +      stageId: Int,
    +      attemptId: Int,
    +      taskId: Long,
    +      accumUpdates: Seq[AccumulableInfo],
    +      succeeded: Boolean): Unit = {
    +    stageMetrics.get(stageId).foreach { metrics =>
    +      if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) {
    +        return
    +      }
    +
    +      val oldTaskMetrics = metrics.taskMetrics.get(taskId)
    +      if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
    +        return
    +      }
    +
    +      val updates = accumUpdates
    +        .filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) }
    +        .sortBy(_.id)
    +
    +      if (updates.isEmpty) {
    +        return
    +      }
    +
    +      val ids = new Array[Long](updates.size)
    +      val values = new Array[Long](updates.size)
    +      updates.zipWithIndex.foreach { case (acc, idx) =>
    +        ids(idx) = acc.id
    +        // In a live application, accumulators have Long values, but when reading from event
    +        // logs, they have String values. For now, assume all accumulators are Long and covert
    +        // accordingly.
    +        values(idx) = acc.update.get match {
    +          case s: String => s.toLong
    +          case l: Long => l
    +          case o => throw new IllegalArgumentException(s"Unexpected: $o")
    +        }
    +      }
    +
    +      metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded))
    +    }
    +  }
    +
    +  private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
    +    // Install the SQL tab in a live app if it hasn't been initialized yet.
    +    if (!uiInitialized) {
    +      ui.foreach { _ui =>
    +        new SQLTab(new SQLAppStatusStore(kvstore, Some(this)), _ui)
    +      }
    +      uiInitialized = true
    +    }
    +
    +    val SparkListenerSQLExecutionStart(executionId, description, details,
    +      physicalPlanDescription, sparkPlanInfo, time) = event
    +
    +    def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper] = {
    +      nodes.map {
    +        case cluster: SparkPlanGraphCluster =>
    +          val storedCluster = new SparkPlanGraphClusterWrapper(
    +            cluster.id,
    +            cluster.name,
    +            cluster.desc,
    +            toStoredNodes(cluster.nodes),
    +            cluster.metrics)
    +          new SparkPlanGraphNodeWrapper(null, storedCluster)
    +
    +        case node =>
    +          new SparkPlanGraphNodeWrapper(node, null)
    +      }
    +    }
    +
    +    val planGraph = SparkPlanGraph(sparkPlanInfo)
    +    val sqlPlanMetrics = planGraph.allNodes.flatMap { node =>
    +      node.metrics.map { metric => (metric.accumulatorId, metric) }
    +    }.toMap.values.toList
    +
    +    val graphToStore = new SparkPlanGraphWrapper(
    +      executionId,
    +      toStoredNodes(planGraph.nodes),
    +      planGraph.edges)
    +    kvstore.write(graphToStore)
    +
    +    val exec = getOrCreateExecution(executionId)
    +    exec.description = description
    +    exec.details = details
    +    exec.physicalPlanDescription = physicalPlanDescription
    +    exec.metrics = sqlPlanMetrics
    +    exec.submissionTime = time
    +    update(exec)
    +  }
    +
    +  private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
    +    val SparkListenerSQLExecutionEnd(executionId, time) = event
    +    liveExecutions.get(executionId).foreach { exec =>
    +      synchronized {
    +        exec.metricsValues = aggregateMetrics(exec)
    +
    +        // Remove stale LiveStageMetrics objects for stages that are not active anymore.
    +        val activeStages = liveExecutions.values.flatMap { other =>
    +          if (other != exec) other.stages else Nil
    +        }.toSet
    +        stageMetrics.retain { case (id, _) => activeStages.contains(id) }
    +
    +        exec.completionTime = Some(new Date(time))
    +        exec.endEvents += 1
    +
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  private def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): Unit = {
    +    val SparkListenerDriverAccumUpdates(executionId, accumUpdates) = event
    +    liveExecutions.get(executionId).foreach { exec =>
    +      exec.driverAccumUpdates = accumUpdates.toMap
    +      update(exec)
    +    }
    +  }
    +
    +  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
    +    case e: SparkListenerSQLExecutionStart => onExecutionStart(e)
    +    case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e)
    +    case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e)
    +    case _ => // Ignore
    +  }
    +
    +  private def getOrCreateExecution(executionId: Long): LiveExecutionData = {
    +    liveExecutions.getOrElseUpdate(executionId, new LiveExecutionData(executionId))
    +  }
    +
    +  private def update(exec: LiveExecutionData): Unit = {
    +    val now = System.nanoTime()
    +    if (exec.endEvents >= exec.jobs.size + 1) {
    +      liveExecutions.remove(exec.executionId)
    +      exec.write(kvstore, now)
    +    } else if (liveUpdatePeriodNs >= 0) {
    +      if (now - exec.lastWriteTime > liveUpdatePeriodNs) {
    +        exec.write(kvstore, now)
    +      }
    +    }
    +  }
    +
    +  private def isSQLStage(stageId: Int): Boolean = {
    +    liveExecutions.values.exists { exec =>
    +      exec.stages.contains(stageId)
    +    }
    +  }
    +
    +}
    +
    +private class LiveExecutionData(val executionId: Long) extends LiveEntity {
    +
    +  var description: String = null
    +  var details: String = null
    +  var physicalPlanDescription: String = null
    +  var metrics = Seq[SQLPlanMetric]()
    +  var submissionTime = -1L
    +  var completionTime: Option[Date] = None
    +
    +  var jobs = Map[Int, JobExecutionStatus]()
    +  var stages = Set[Int]()
    +  var driverAccumUpdates = Map[Long, Long]()
    +
    +  var metricsValues: Map[Long, String] = null
    +
    +  // Just in case job end and execution end arrive out of order, keep track of how many
    +  // end events arrived so that the listener can stop tracking the execution.
    +  var endEvents = 0
    +
    +  override protected def doUpdate(): Any = {
    +    new SQLExecutionUIData(
    +      executionId,
    +      description,
    +      details,
    +      physicalPlanDescription,
    +      metrics,
    +      submissionTime,
    +      completionTime,
    +      jobs,
    +      stages,
    +      metricsValues)
    +  }
    +
    +}
    +
    +private class LiveStageMetrics(
    +    val stageId: Int,
    +    var attemptId: Int,
    +    val accumulatorIds: Array[Long],
    +    val taskMetrics: ConcurrentHashMap[Long, LiveTaskMetrics])
    --- End diff --
    
    can you include a comment explaining the threading concerns in this class?  at first I didn't think the CHM or the `synchronized` were necessary since the listener methods are just called from one thread, but then I realized the UI will also call `executionMetrics(executionId)` (I think that is the only reason?).  Also I think there are more `synchronized` than necessary -- both sites `aggregateMetrics` are called have already acquired the lock, so that shouldn't need it again.  though it doesn't hurt, it can be confusing if its not clear where there lock is supposed to be aquired.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149801087
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala ---
    @@ -0,0 +1,353 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.ui
    +
    +import java.util.Date
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.metric._
    +import org.apache.spark.sql.internal.StaticSQLConf._
    +import org.apache.spark.status.LiveEntity
    +import org.apache.spark.status.config._
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.kvstore.KVStore
    +
    +private[sql] class SQLAppStatusListener(
    +    conf: SparkConf,
    +    kvstore: KVStore,
    +    live: Boolean,
    +    ui: Option[SparkUI] = None)
    +  extends SparkListener with Logging {
    +
    +  // How often to flush intermediate state of a live execution to the store. When replaying logs,
    +  // never flush (only do the very last write).
    +  private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
    +
    +  private val liveExecutions = new HashMap[Long, LiveExecutionData]()
    +  private val stageMetrics = new HashMap[Int, LiveStageMetrics]()
    +
    +  private var uiInitialized = false
    +
    +  override def onJobStart(event: SparkListenerJobStart): Unit = {
    +    val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
    +    if (executionIdString == null) {
    +      // This is not a job created by SQL
    +      return
    +    }
    +
    +    val executionId = executionIdString.toLong
    +    val jobId = event.jobId
    +    val exec = getOrCreateExecution(executionId)
    +
    +    // Record the accumulator IDs for the stages of this job, so that the code that keeps
    +    // track of the metrics knows which accumulators to look at.
    +    val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
    +    event.stageIds.foreach { id =>
    +      stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap()))
    +    }
    +
    +    exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
    +    exec.stages = event.stageIds.toSet
    +    update(exec)
    +  }
    +
    +  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
    +    if (!isSQLStage(event.stageInfo.stageId)) {
    +      return
    +    }
    +
    +    // Reset the metrics tracking object for the new attempt.
    +    stageMetrics.get(event.stageInfo.stageId).foreach { metrics =>
    +      metrics.taskMetrics.clear()
    +      metrics.attemptId = event.stageInfo.attemptId
    +    }
    +  }
    +
    +  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
    +    liveExecutions.values.foreach { exec =>
    +      if (exec.jobs.contains(event.jobId)) {
    +        val result = event.jobResult match {
    +          case JobSucceeded => JobExecutionStatus.SUCCEEDED
    +          case _ => JobExecutionStatus.FAILED
    +        }
    +        exec.jobs = exec.jobs + (event.jobId -> result)
    +        exec.endEvents += 1
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
    +    event.accumUpdates.foreach { case (taskId, stageId, attemptId, accumUpdates) =>
    +      updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
    +    }
    +  }
    +
    +  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
    +    if (!isSQLStage(event.stageId)) {
    +      return
    +    }
    +
    +    val info = event.taskInfo
    +    // SPARK-20342. If processing events from a live application, use the task metrics info to
    +    // work around a race in the DAGScheduler. The metrics info does not contain accumulator info
    +    // when reading event logs in the SHS, so we have to rely on the accumulator in that case.
    +    val accums = if (live && event.taskMetrics != null) {
    +      event.taskMetrics.externalAccums.flatMap { a =>
    +        // This call may fail if the accumulator is gc'ed, so account for that.
    +        try {
    +          Some(a.toInfo(Some(a.value), None))
    +        } catch {
    +          case _: IllegalAccessError => None
    +        }
    +      }
    +    } else {
    +      info.accumulables
    +    }
    +    updateStageMetrics(event.stageId, event.stageAttemptId, info.taskId, accums,
    +      info.successful)
    +  }
    +
    +  def executionMetrics(executionId: Long): Map[Long, String] = synchronized {
    +    liveExecutions.get(executionId).map { exec =>
    +      if (exec.metricsValues != null) {
    +        exec.metricsValues
    +      } else {
    +        aggregateMetrics(exec)
    +      }
    +    }.getOrElse {
    +      throw new NoSuchElementException(s"execution $executionId not found")
    +    }
    +  }
    +
    +  private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = synchronized {
    +    val metricIds = exec.metrics.map(_.accumulatorId).sorted
    +    val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap
    +    val metrics = exec.stages.toSeq
    +      .flatMap(stageMetrics.get)
    +      .flatMap(_.taskMetrics.values().asScala)
    +      .flatMap { metrics => metrics.ids.zip(metrics.values) }
    +
    +    (metrics ++ exec.driverAccumUpdates.toSeq)
    +      .filter { case (id, _) => metricIds.contains(id) }
    +      .groupBy(_._1)
    +      .map { case (id, values) =>
    +        id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq)
    +      }
    +  }
    +
    +  private def updateStageMetrics(
    +      stageId: Int,
    +      attemptId: Int,
    +      taskId: Long,
    +      accumUpdates: Seq[AccumulableInfo],
    +      succeeded: Boolean): Unit = {
    +    stageMetrics.get(stageId).foreach { metrics =>
    +      if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) {
    +        return
    +      }
    +
    +      val oldTaskMetrics = metrics.taskMetrics.get(taskId)
    +      if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
    +        return
    +      }
    +
    +      val updates = accumUpdates
    +        .filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) }
    +        .sortBy(_.id)
    +
    +      if (updates.isEmpty) {
    +        return
    +      }
    +
    +      val ids = new Array[Long](updates.size)
    +      val values = new Array[Long](updates.size)
    +      updates.zipWithIndex.foreach { case (acc, idx) =>
    +        ids(idx) = acc.id
    +        // In a live application, accumulators have Long values, but when reading from event
    +        // logs, they have String values. For now, assume all accumulators are Long and covert
    +        // accordingly.
    +        values(idx) = acc.update.get match {
    +          case s: String => s.toLong
    +          case l: Long => l
    +          case o => throw new IllegalArgumentException(s"Unexpected: $o")
    +        }
    +      }
    +
    +      metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded))
    +    }
    +  }
    +
    +  private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
    +    // Install the SQL tab in a live app if it hasn't been initialized yet.
    +    if (!uiInitialized) {
    +      ui.foreach { _ui =>
    +        new SQLTab(new SQLAppStatusStore(kvstore, Some(this)), _ui)
    +      }
    +      uiInitialized = true
    +    }
    +
    +    val SparkListenerSQLExecutionStart(executionId, description, details,
    +      physicalPlanDescription, sparkPlanInfo, time) = event
    +
    +    def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper] = {
    +      nodes.map {
    +        case cluster: SparkPlanGraphCluster =>
    +          val storedCluster = new SparkPlanGraphClusterWrapper(
    +            cluster.id,
    +            cluster.name,
    +            cluster.desc,
    +            toStoredNodes(cluster.nodes),
    +            cluster.metrics)
    +          new SparkPlanGraphNodeWrapper(null, storedCluster)
    +
    +        case node =>
    +          new SparkPlanGraphNodeWrapper(node, null)
    +      }
    +    }
    +
    +    val planGraph = SparkPlanGraph(sparkPlanInfo)
    +    val sqlPlanMetrics = planGraph.allNodes.flatMap { node =>
    +      node.metrics.map { metric => (metric.accumulatorId, metric) }
    +    }.toMap.values.toList
    +
    +    val graphToStore = new SparkPlanGraphWrapper(
    +      executionId,
    +      toStoredNodes(planGraph.nodes),
    +      planGraph.edges)
    +    kvstore.write(graphToStore)
    +
    +    val exec = getOrCreateExecution(executionId)
    +    exec.description = description
    +    exec.details = details
    +    exec.physicalPlanDescription = physicalPlanDescription
    +    exec.metrics = sqlPlanMetrics
    +    exec.submissionTime = time
    +    update(exec)
    +  }
    +
    +  private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
    +    val SparkListenerSQLExecutionEnd(executionId, time) = event
    +    liveExecutions.get(executionId).foreach { exec =>
    +      synchronized {
    +        exec.metricsValues = aggregateMetrics(exec)
    +
    +        // Remove stale LiveStageMetrics objects for stages that are not active anymore.
    +        val activeStages = liveExecutions.values.flatMap { other =>
    +          if (other != exec) other.stages else Nil
    +        }.toSet
    +        stageMetrics.retain { case (id, _) => activeStages.contains(id) }
    +
    +        exec.completionTime = Some(new Date(time))
    +        exec.endEvents += 1
    +
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  private def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): Unit = {
    +    val SparkListenerDriverAccumUpdates(executionId, accumUpdates) = event
    +    liveExecutions.get(executionId).foreach { exec =>
    +      exec.driverAccumUpdates = accumUpdates.toMap
    +      update(exec)
    +    }
    +  }
    +
    +  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
    +    case e: SparkListenerSQLExecutionStart => onExecutionStart(e)
    +    case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e)
    +    case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e)
    +    case _ => // Ignore
    +  }
    +
    +  private def getOrCreateExecution(executionId: Long): LiveExecutionData = {
    +    liveExecutions.getOrElseUpdate(executionId, new LiveExecutionData(executionId))
    +  }
    +
    +  private def update(exec: LiveExecutionData): Unit = {
    +    val now = System.nanoTime()
    +    if (exec.endEvents >= exec.jobs.size + 1) {
    +      liveExecutions.remove(exec.executionId)
    +      exec.write(kvstore, now)
    +    } else if (liveUpdatePeriodNs >= 0) {
    +      if (now - exec.lastWriteTime > liveUpdatePeriodNs) {
    +        exec.write(kvstore, now)
    +      }
    +    }
    +  }
    +
    +  private def isSQLStage(stageId: Int): Boolean = {
    +    liveExecutions.values.exists { exec =>
    +      exec.stages.contains(stageId)
    +    }
    +  }
    +
    +}
    +
    +private class LiveExecutionData(val executionId: Long) extends LiveEntity {
    +
    +  var description: String = null
    +  var details: String = null
    +  var physicalPlanDescription: String = null
    +  var metrics = Seq[SQLPlanMetric]()
    +  var submissionTime = -1L
    +  var completionTime: Option[Date] = None
    +
    +  var jobs = Map[Int, JobExecutionStatus]()
    +  var stages = Set[Int]()
    +  var driverAccumUpdates = Map[Long, Long]()
    +
    +  var metricsValues: Map[Long, String] = null
    +
    +  // Just in case job end and execution end arrive out of order, keep track of how many
    +  // end events arrived so that the listener can stop tracking the execution.
    +  var endEvents = 0
    +
    +  override protected def doUpdate(): Any = {
    +    new SQLExecutionUIData(
    +      executionId,
    +      description,
    +      details,
    +      physicalPlanDescription,
    +      metrics,
    +      submissionTime,
    +      completionTime,
    +      jobs,
    +      stages,
    +      metricsValues)
    +  }
    +
    +}
    +
    +private class LiveStageMetrics(
    +    val stageId: Int,
    +    var attemptId: Int,
    +    val accumulatorIds: Array[Long],
    +    val taskMetrics: ConcurrentHashMap[Long, LiveTaskMetrics])
    --- End diff --
    
    I changed the implementation a bit since it wasn't completely correct. Added some comments in a few places.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    **[Test build #83563 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83563/testReport)** for PR 19681 at commit [`46695ed`](https://github.com/apache/spark/commit/46695edece81057986e7a0b7f87a0eacbe0b0fca).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    **[Test build #83658 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83658/testReport)** for PR 19681 at commit [`1a31665`](https://github.com/apache/spark/commit/1a31665ab6d3352dee3e15c87a697a7e655eb34c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149450470
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala ---
    @@ -0,0 +1,353 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.ui
    +
    +import java.util.Date
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.metric._
    +import org.apache.spark.sql.internal.StaticSQLConf._
    +import org.apache.spark.status.LiveEntity
    +import org.apache.spark.status.config._
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.kvstore.KVStore
    +
    +private[sql] class SQLAppStatusListener(
    +    conf: SparkConf,
    +    kvstore: KVStore,
    +    live: Boolean,
    +    ui: Option[SparkUI] = None)
    +  extends SparkListener with Logging {
    +
    +  // How often to flush intermediate statge of a live execution to the store. When replaying logs,
    --- End diff --
    
    typo: statge


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    **[Test build #83563 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83563/testReport)** for PR 19681 at commit [`46695ed`](https://github.com/apache/spark/commit/46695edece81057986e7a0b7f87a0eacbe0b0fca).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r150094936
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala ---
    @@ -0,0 +1,367 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.ui
    +
    +import java.util.Date
    +import java.util.concurrent.ConcurrentHashMap
    +import java.util.function.Function
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.metric._
    +import org.apache.spark.status.LiveEntity
    +import org.apache.spark.status.config._
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.kvstore.KVStore
    +
    +private[sql] class SQLAppStatusListener(
    +    conf: SparkConf,
    +    kvstore: KVStore,
    +    live: Boolean,
    +    ui: Option[SparkUI] = None)
    +  extends SparkListener with Logging {
    +
    +  // How often to flush intermediate state of a live execution to the store. When replaying logs,
    +  // never flush (only do the very last write).
    +  private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
    +
    +  // Live tracked data is needed by the SQL status store to calculate metrics for in-flight
    +  // executions; that means arbitrary threads may be querying these maps, so they need to be
    +  // thread-safe.
    +  private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]()
    +  private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()
    +
    +  private var uiInitialized = false
    +
    +  override def onJobStart(event: SparkListenerJobStart): Unit = {
    +    val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
    +    if (executionIdString == null) {
    +      // This is not a job created by SQL
    +      return
    +    }
    +
    +    val executionId = executionIdString.toLong
    +    val jobId = event.jobId
    +    val exec = getOrCreateExecution(executionId)
    +
    +    // Record the accumulator IDs for the stages of this job, so that the code that keeps
    +    // track of the metrics knows which accumulators to look at.
    +    val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
    +    event.stageIds.foreach { id =>
    +      stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap()))
    +    }
    +
    +    exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
    +    exec.stages = event.stageIds.toSet
    +    update(exec)
    +  }
    +
    +  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
    +    if (!isSQLStage(event.stageInfo.stageId)) {
    +      return
    +    }
    +
    +    // Reset the metrics tracking object for the new attempt.
    +    Option(stageMetrics.get(event.stageInfo.stageId)).foreach { metrics =>
    +      metrics.taskMetrics.clear()
    +      metrics.attemptId = event.stageInfo.attemptId
    +    }
    +  }
    +
    +  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
    +    liveExecutions.values().asScala.foreach { exec =>
    +      if (exec.jobs.contains(event.jobId)) {
    +        val result = event.jobResult match {
    +          case JobSucceeded => JobExecutionStatus.SUCCEEDED
    +          case _ => JobExecutionStatus.FAILED
    +        }
    +        exec.jobs = exec.jobs + (event.jobId -> result)
    +        exec.endEvents += 1
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
    +    event.accumUpdates.foreach { case (taskId, stageId, attemptId, accumUpdates) =>
    +      updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
    +    }
    +  }
    +
    +  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
    +    if (!isSQLStage(event.stageId)) {
    +      return
    +    }
    +
    +    val info = event.taskInfo
    +    // SPARK-20342. If processing events from a live application, use the task metrics info to
    +    // work around a race in the DAGScheduler. The metrics info does not contain accumulator info
    +    // when reading event logs in the SHS, so we have to rely on the accumulator in that case.
    +    val accums = if (live && event.taskMetrics != null) {
    +      event.taskMetrics.externalAccums.flatMap { a =>
    +        // This call may fail if the accumulator is gc'ed, so account for that.
    +        try {
    +          Some(a.toInfo(Some(a.value), None))
    +        } catch {
    +          case _: IllegalAccessError => None
    +        }
    +      }
    +    } else {
    +      info.accumulables
    +    }
    +    updateStageMetrics(event.stageId, event.stageAttemptId, info.taskId, accums,
    +      info.successful)
    +  }
    +
    +  def executionMetrics(executionId: Long): Map[Long, String] = {
    +    Option(liveExecutions.get(executionId)).map { exec =>
    +      if (exec.metricsValues != null) {
    +        exec.metricsValues
    +      } else {
    +        aggregateMetrics(exec)
    +      }
    +    }.getOrElse {
    +      throw new NoSuchElementException(s"execution $executionId not found")
    +    }
    +  }
    +
    +  private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = {
    +    val metricIds = exec.metrics.map(_.accumulatorId).sorted
    +    val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap
    +    val metrics = exec.stages.toSeq
    +      .flatMap { stageId => Option(stageMetrics.get(stageId)) }
    +      .flatMap(_.taskMetrics.values().asScala)
    +      .flatMap { metrics => metrics.ids.zip(metrics.values) }
    +
    +    val aggregatedMetrics = (metrics ++ exec.driverAccumUpdates.toSeq)
    +      .filter { case (id, _) => metricIds.contains(id) }
    +      .groupBy(_._1)
    +      .map { case (id, values) =>
    +        id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq)
    +      }
    +
    +    // Check the execution again for whether the aggregated metrics data has been calculated.
    +    // This can happen if the UI is requesting this data, and the onExecutionEnd handler is
    +    // running at the same time. The metrics calculcated for the UI can be innacurate in that
    +    // case, since the onExecutionEnd handler will clean up tracked stage metrics.
    +    if (exec.metricsValues != null) {
    +      exec.metricsValues
    +    } else {
    +      aggregatedMetrics
    +    }
    +  }
    +
    +  private def updateStageMetrics(
    +      stageId: Int,
    +      attemptId: Int,
    +      taskId: Long,
    +      accumUpdates: Seq[AccumulableInfo],
    +      succeeded: Boolean): Unit = {
    +    Option(stageMetrics.get(stageId)).foreach { metrics =>
    +      if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) {
    +        return
    +      }
    +
    +      val oldTaskMetrics = metrics.taskMetrics.get(taskId)
    +      if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
    +        return
    +      }
    +
    +      val updates = accumUpdates
    +        .filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) }
    +        .sortBy(_.id)
    +
    +      if (updates.isEmpty) {
    +        return
    +      }
    +
    +      val ids = new Array[Long](updates.size)
    +      val values = new Array[Long](updates.size)
    +      updates.zipWithIndex.foreach { case (acc, idx) =>
    +        ids(idx) = acc.id
    +        // In a live application, accumulators have Long values, but when reading from event
    +        // logs, they have String values. For now, assume all accumulators are Long and covert
    +        // accordingly.
    +        values(idx) = acc.update.get match {
    +          case s: String => s.toLong
    +          case l: Long => l
    +          case o => throw new IllegalArgumentException(s"Unexpected: $o")
    +        }
    +      }
    +
    +      // TODO: storing metrics by task ID can lead to innacurate metrics when speculation is on.
    --- End diff --
    
    I think its more general than this.  I'd say
    
    Since we store metrics by taskID, in a way we'll double-count the stage metrics when there are multiple tasks for a given index -- in particular, if there is speculation, or if there are multiple attempts for a task.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    **[Test build #83620 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83620/testReport)** for PR 19681 at commit [`bb7388b`](https://github.com/apache/spark/commit/bb7388b86d7adf8bbf209cf7748c319c4b8c0c77).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r157129912
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala ---
    @@ -118,309 +142,286 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
           (id, accumulatorValue)
         }.toMap
     
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
     
    -    val executionUIData = listener.executionIdToData(0)
    -
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq(
             createStageInfo(0, 0),
             createStageInfo(1, 0)
           ),
           createProperties(executionId)))
    -    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
    +    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
     
    -    assert(listener.getExecutionMetrics(0).isEmpty)
    +    assert(store.executionMetrics(0).isEmpty)
     
    -    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
    +      (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
     
         // Driver accumulator updates don't belong to this execution should be filtered and no
         // exception will be thrown.
    -    listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
     
    -    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 0, 0,
    -        createTaskMetrics(accumulatorUpdates.mapValues(_ * 2)).accumulators().map(makeInfo))
    +      (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2)))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 3))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3))
     
         // Retrying a stage should reset the metrics
    -    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
    +    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
     
    -    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
    +      (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
     
         // Ignore the task end for the first attempt
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
    -      createTaskInfo(0, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 100))))
    +      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
    +      null))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
     
         // Finish two tasks
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
    -      createTaskInfo(0, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 2))))
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)),
    +      null))
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
    -      createTaskInfo(1, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
    +      createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
    +      null))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 5))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 5))
     
         // Summit a new stage
    -    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
    +    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0)))
     
    -    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 1, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 1, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
    +      (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 7))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 7))
     
         // Finish two tasks
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
    -      createTaskInfo(0, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
    +      null))
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
    -      createTaskInfo(1, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
    +      createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
    +      null))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
     
    -    assert(executionUIData.runningJobs === Seq(0))
    -    assert(executionUIData.succeededJobs.isEmpty)
    -    assert(executionUIData.failedJobs.isEmpty)
    +    assertJobs(store.execution(0), running = Seq(0))
     
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
     
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs === Seq(0))
    -    assert(executionUIData.failedJobs.isEmpty)
    -
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    assertJobs(store.execution(0), completed = Seq(0))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
       }
     
    -  test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    +  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") { (store, bus) =>
         val executionId = 0
         val df = createTestDataFrame
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    val executionUIData = listener.executionIdToData(0)
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs === Seq(0))
    -    assert(executionUIData.failedJobs.isEmpty)
    +    assertJobs(store.execution(0), completed = Seq(0))
       }
     
    -  test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    +  sqlStoreTest("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") { (store, bus) =>
         val executionId = 0
         val df = createTestDataFrame
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
             jobId = 0,
             time = System.currentTimeMillis(),
             JobSucceeded
         ))
     
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 1,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 1,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    val executionUIData = listener.executionIdToData(0)
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs.sorted === Seq(0, 1))
    -    assert(executionUIData.failedJobs.isEmpty)
    +    assertJobs(store.execution(0), completed = Seq(0, 1))
       }
     
    -  test("onExecutionEnd happens before onJobEnd(JobFailed)") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    +  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") { (store, bus) =>
         val executionId = 0
         val df = createTestDataFrame
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq.empty,
           createProperties(executionId)))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobFailed(new RuntimeException("Oops"))
         ))
     
    -    val executionUIData = listener.executionIdToData(0)
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs.isEmpty)
    -    assert(executionUIData.failedJobs === Seq(0))
    +    assertJobs(store.execution(0), failed = Seq(0))
       }
     
       test("SPARK-11126: no memory leak when running non SQL jobs") {
    -    val previousStageNumber = spark.sharedState.listener.stageIdToStageMetrics.size
    +    val previousStageNumber = statusStore.executionsList().size
         spark.sparkContext.parallelize(1 to 10).foreach(i => ())
         spark.sparkContext.listenerBus.waitUntilEmpty(10000)
    --- End diff --
    
    ditto, the listener is not attached to spark event bus.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149537039
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala ---
    @@ -0,0 +1,353 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.ui
    +
    +import java.util.Date
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.metric._
    +import org.apache.spark.sql.internal.StaticSQLConf._
    +import org.apache.spark.status.LiveEntity
    +import org.apache.spark.status.config._
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.kvstore.KVStore
    +
    +private[sql] class SQLAppStatusListener(
    +    conf: SparkConf,
    +    kvstore: KVStore,
    +    live: Boolean,
    +    ui: Option[SparkUI] = None)
    +  extends SparkListener with Logging {
    +
    +  // How often to flush intermediate stage of a live execution to the store. When replaying logs,
    +  // never flush (only do the very last write).
    +  private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
    +
    +  private val liveExecutions = new HashMap[Long, LiveExecutionData]()
    +  private val stageMetrics = new HashMap[Int, LiveStageMetrics]()
    +
    +  private var uiInitialized = false
    +
    +  override def onJobStart(event: SparkListenerJobStart): Unit = {
    +    val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
    +    if (executionIdString == null) {
    +      // This is not a job created by SQL
    +      return
    +    }
    +
    +    val executionId = executionIdString.toLong
    +    val jobId = event.jobId
    +    val exec = getOrCreateExecution(executionId)
    +
    +    // Record the accumulator IDs for the stages of this job, so that the code that keeps
    +    // track of the metrics knows which accumulators to look at.
    +    val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
    +    event.stageIds.foreach { id =>
    +      stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap()))
    +    }
    +
    +    exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
    +    exec.stages = event.stageIds
    +    update(exec)
    +  }
    +
    +  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
    +    if (!isSQLStage(event.stageInfo.stageId)) {
    +      return
    +    }
    +
    +    // Reset the metrics tracking object for the new attempt.
    +    stageMetrics.get(event.stageInfo.stageId).foreach { metrics =>
    +      metrics.taskMetrics.clear()
    +      metrics.attemptId = event.stageInfo.attemptId
    +    }
    +  }
    +
    +  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
    +    liveExecutions.values.foreach { exec =>
    +      if (exec.jobs.contains(event.jobId)) {
    +        val result = event.jobResult match {
    +          case JobSucceeded => JobExecutionStatus.SUCCEEDED
    +          case _ => JobExecutionStatus.FAILED
    +        }
    +        exec.jobs = exec.jobs + (event.jobId -> result)
    +        exec.endEvents += 1
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
    +    event.accumUpdates.foreach { case (taskId, stageId, attemptId, accumUpdates) =>
    +      updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
    +    }
    +  }
    +
    +  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
    +    if (!isSQLStage(event.stageId)) {
    +      return
    +    }
    +
    +    val info = event.taskInfo
    +    // SPARK-20342. If processing events from a live application, use the task metrics info to
    +    // work around a race in the DAGScheduler. The metrics info does not contain accumulator info
    +    // when reading event logs in the SHS, so we have to rely on the accumulator in that case.
    +    val accums = if (live && event.taskMetrics != null) {
    +      event.taskMetrics.externalAccums.flatMap { a =>
    +        // This call may fail if the accumulator is gc'ed, so account for that.
    +        try {
    +          Some(a.toInfo(Some(a.value), None))
    +        } catch {
    +          case _: IllegalAccessError => None
    +        }
    +      }
    +    } else {
    +      info.accumulables
    +    }
    +    updateStageMetrics(event.stageId, event.stageAttemptId, info.taskId, accums,
    +      info.successful)
    +  }
    +
    +  def executionMetrics(executionId: Long): Map[Long, String] = synchronized {
    +    liveExecutions.get(executionId).map { exec =>
    +      if (exec.metricsValues != null) {
    +        exec.metricsValues
    +      } else {
    +        aggregateMetrics(exec)
    +      }
    +    }.getOrElse {
    +      throw new NoSuchElementException(s"execution $executionId not found")
    +    }
    +  }
    +
    +  private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = synchronized {
    +    val metricIds = exec.metrics.map(_.accumulatorId).sorted
    +    val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap
    +    val metrics = exec.stages
    +      .flatMap(stageMetrics.get)
    +      .flatMap(_.taskMetrics.values().asScala)
    +      .flatMap { metrics => metrics.ids.zip(metrics.values) }
    +
    +    (metrics ++ exec.driverAccumUpdates.toSeq)
    +      .filter { case (id, _) => metricIds.contains(id) }
    +      .groupBy(_._1)
    +      .map { case (id, values) =>
    +        id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq)
    +      }
    +  }
    +
    +  private def updateStageMetrics(
    +      stageId: Int,
    +      attemptId: Int,
    +      taskId: Long,
    +      accumUpdates: Seq[AccumulableInfo],
    +      succeeded: Boolean): Unit = {
    +    stageMetrics.get(stageId).foreach { metrics =>
    +      if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) {
    +        return
    +      }
    +
    +      val oldTaskMetrics = metrics.taskMetrics.get(taskId)
    +      if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
    +        return
    +      }
    +
    +      val updates = accumUpdates
    +        .filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) }
    +        .sortBy(_.id)
    +
    +      if (updates.isEmpty) {
    +        return
    +      }
    +
    +      val ids = new Array[Long](updates.size)
    +      val values = new Array[Long](updates.size)
    +      updates.zipWithIndex.foreach { case (acc, idx) =>
    +        ids(idx) = acc.id
    +        // In a live application, accumulators have Long values, but when reading from event
    +        // logs, they have String values. For now, assume all accumulators are Long and covert
    +        // accordingly.
    +        values(idx) = acc.update.get match {
    +          case s: String => s.toLong
    +          case l: Long => l
    +          case o => throw new IllegalArgumentException(s"Unexpected: $o")
    +        }
    +      }
    +
    +      metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded))
    +    }
    +  }
    +
    +  private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
    +    // Install the SQL tab in a live app if it hasn't been initialized yet.
    +    if (!uiInitialized) {
    +      ui.foreach { _ui =>
    +        new SQLTab(new SQLAppStatusStore(kvstore, Some(this)), _ui)
    +      }
    +      uiInitialized = true
    +    }
    +
    +    val SparkListenerSQLExecutionStart(executionId, description, details,
    +      physicalPlanDescription, sparkPlanInfo, time) = event
    +
    +    def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper] = {
    +      nodes.map {
    +        case cluster: SparkPlanGraphCluster =>
    +          val storedCluster = new SparkPlanGraphClusterWrapper(
    +            cluster.id,
    +            cluster.name,
    +            cluster.desc,
    +            toStoredNodes(cluster.nodes),
    +            cluster.metrics)
    +          new SparkPlanGraphNodeWrapper(null, storedCluster)
    +
    +        case node =>
    +          new SparkPlanGraphNodeWrapper(node, null)
    +      }
    +    }
    +
    +    val planGraph = SparkPlanGraph(sparkPlanInfo)
    +    val sqlPlanMetrics = planGraph.allNodes.flatMap { node =>
    +      node.metrics.map { metric => (metric.accumulatorId, metric) }
    +    }.toMap.values.toList
    +
    +    val graphToStore = new SparkPlanGraphWrapper(
    +      executionId,
    +      toStoredNodes(planGraph.nodes),
    +      planGraph.edges)
    +    kvstore.write(graphToStore)
    +
    +    val exec = getOrCreateExecution(executionId)
    +    exec.description = description
    +    exec.details = details
    +    exec.physicalPlanDescription = physicalPlanDescription
    +    exec.metrics = sqlPlanMetrics
    +    exec.submissionTime = time
    +    update(exec)
    +  }
    +
    +  private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
    +    val SparkListenerSQLExecutionEnd(executionId, time) = event
    +    liveExecutions.get(executionId).foreach { exec =>
    +      synchronized {
    +        exec.metricsValues = aggregateMetrics(exec)
    +
    +        // Remove stale LiveStageMetrics objects for stages that are not active anymore.
    +        val activeStages = liveExecutions.values.flatMap { other =>
    +          if (other != exec) other.stages else Nil
    +        }.toSet
    +        stageMetrics.retain { case (id, _) => activeStages.contains(id) }
    +
    +        exec.completionTime = Some(new Date(time))
    +        exec.endEvents += 1
    +
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  private def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): Unit = {
    +    val SparkListenerDriverAccumUpdates(executionId, accumUpdates) = event
    +    liveExecutions.get(executionId).foreach { exec =>
    +      exec.driverAccumUpdates = accumUpdates.toMap
    +      update(exec)
    +    }
    +  }
    +
    +  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
    +    case e: SparkListenerSQLExecutionStart => onExecutionStart(e)
    +    case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e)
    +    case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e)
    +    case _ => // Ignore
    +  }
    +
    +  private def getOrCreateExecution(executionId: Long): LiveExecutionData = {
    +    liveExecutions.getOrElseUpdate(executionId, new LiveExecutionData(executionId))
    +  }
    +
    +  private def update(exec: LiveExecutionData): Unit = {
    +    val now = System.nanoTime()
    +    if (exec.endEvents >= exec.jobs.size + 1) {
    +      liveExecutions.remove(exec.executionId)
    +      exec.write(kvstore, now)
    --- End diff --
    
    When you kill a job, isn't an end event generated?
    
    This is trying to mimic the behavior from the old listener:
    
    ```
          jobEnd.jobResult match {
            case JobSucceeded => executionUIData.jobs(jobId) = JobExecutionStatus.SUCCEEDED
            case JobFailed(_) => executionUIData.jobs(jobId) = JobExecutionStatus.FAILED
          }
          if (executionUIData.completionTime.nonEmpty && !executionUIData.hasRunningJobs) {
            // We are the last job of this execution, so mark the execution as finished. Note that
            // `onExecutionEnd` also does this, but currently that can be called before `onJobEnd`
            // since these are called on different threads.
            markExecutionFinished(executionId)
          }
    ```
    
    So in that case, if there's no end event for the job, the execution will not be marker as finished either.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    merged to master


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149780801
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala ---
    @@ -420,21 +428,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
         withTempPath { file =>
           // person creates a temporary view. get the DF before listing previous execution IDs
           val data = person.select('name)
    -      sparkContext.listenerBus.waitUntilEmpty(10000)
    --- End diff --
    
    don't you still need this `waitUntilEmpty`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r157278467
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala ---
    @@ -36,13 +36,14 @@ import org.apache.spark.sql.catalyst.util.quietly
     import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution}
     import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
     import org.apache.spark.sql.test.SharedSQLContext
    -import org.apache.spark.ui.SparkUI
    +import org.apache.spark.status.config._
     import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator}
    -
    +import org.apache.spark.util.kvstore.InMemoryStore
     
     class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils {
       import testImplicits._
    -  import org.apache.spark.AccumulatorSuite.makeInfo
    +
    +  override protected def sparkConf = super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
    --- End diff --
    
    Is that true? The suite extends `SharedSQLContext` (which extends `SharedSparkSession`) and `SQLTestUtils`, all of which are traits, not objects. (Unlike `TestHive` which does force sessions to be used across suites for hive tests.)
    
    There are also other suites that modify the conf (such as `HDFSMetadataLogSuite`).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149767731
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala ---
    @@ -0,0 +1,172 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.ui
    +
    +import java.lang.{Long => JLong}
    +import java.util.Date
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ArrayBuffer
    +
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
    --- End diff --
    
    SparkListenerEvent is unused


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149500473
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala ---
    @@ -0,0 +1,353 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.ui
    +
    +import java.util.Date
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.metric._
    +import org.apache.spark.sql.internal.StaticSQLConf._
    +import org.apache.spark.status.LiveEntity
    +import org.apache.spark.status.config._
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.kvstore.KVStore
    +
    +private[sql] class SQLAppStatusListener(
    --- End diff --
    
    > SQLAppStatusStore.executionMetrics has logic to call the listener directly when the final metrics are not yet computed.
    
    ahh, that is the part I hadn't noticed.  thanks


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    **[Test build #83855 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83855/testReport)** for PR 19681 at commit [`c068a5b`](https://github.com/apache/spark/commit/c068a5bb88e73532018d8f7a8dbbf96ffb6a93d8).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r157129866
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala ---
    @@ -118,309 +142,286 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
           (id, accumulatorValue)
         }.toMap
     
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
     
    -    val executionUIData = listener.executionIdToData(0)
    -
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq(
             createStageInfo(0, 0),
             createStageInfo(1, 0)
           ),
           createProperties(executionId)))
    -    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
    +    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
     
    -    assert(listener.getExecutionMetrics(0).isEmpty)
    +    assert(store.executionMetrics(0).isEmpty)
     
    -    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
    +      (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
     
         // Driver accumulator updates don't belong to this execution should be filtered and no
         // exception will be thrown.
    -    listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
     
    -    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 0, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 0, 0,
    -        createTaskMetrics(accumulatorUpdates.mapValues(_ * 2)).accumulators().map(makeInfo))
    +      (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2)))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 3))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3))
     
         // Retrying a stage should reset the metrics
    -    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
    +    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
     
    -    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 0, 1, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
    +      (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
     
         // Ignore the task end for the first attempt
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
    -      createTaskInfo(0, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 100))))
    +      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
    +      null))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2))
     
         // Finish two tasks
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
    -      createTaskInfo(0, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 2))))
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)),
    +      null))
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
    -      createTaskInfo(1, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
    +      createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
    +      null))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 5))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 5))
     
         // Summit a new stage
    -    listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
    +    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0)))
     
    -    listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
    +    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
    -      (0L, 1, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo)),
    -      (1L, 1, 0, createTaskMetrics(accumulatorUpdates).accumulators().map(makeInfo))
    +      (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)),
    +      (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 7))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 7))
     
         // Finish two tasks
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
    -      createTaskInfo(0, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
    -    listener.onTaskEnd(SparkListenerTaskEnd(
    +      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
    +      null))
    +    bus.postToAll(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
    -      createTaskInfo(1, 0),
    -      createTaskMetrics(accumulatorUpdates.mapValues(_ * 3))))
    +      createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
    +      null))
     
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
     
    -    assert(executionUIData.runningJobs === Seq(0))
    -    assert(executionUIData.succeededJobs.isEmpty)
    -    assert(executionUIData.failedJobs.isEmpty)
    +    assertJobs(store.execution(0), running = Seq(0))
     
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
     
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs === Seq(0))
    -    assert(executionUIData.failedJobs.isEmpty)
    -
    -    checkAnswer(listener.getExecutionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
    +    assertJobs(store.execution(0), completed = Seq(0))
    +    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11))
       }
     
    -  test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    +  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") { (store, bus) =>
         val executionId = 0
         val df = createTestDataFrame
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    val executionUIData = listener.executionIdToData(0)
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs === Seq(0))
    -    assert(executionUIData.failedJobs.isEmpty)
    +    assertJobs(store.execution(0), completed = Seq(0))
       }
     
    -  test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    +  sqlStoreTest("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") { (store, bus) =>
         val executionId = 0
         val df = createTestDataFrame
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
             jobId = 0,
             time = System.currentTimeMillis(),
             JobSucceeded
         ))
     
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 1,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 1,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    val executionUIData = listener.executionIdToData(0)
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs.sorted === Seq(0, 1))
    -    assert(executionUIData.failedJobs.isEmpty)
    +    assertJobs(store.execution(0), completed = Seq(0, 1))
       }
     
    -  test("onExecutionEnd happens before onJobEnd(JobFailed)") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    +  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") { (store, bus) =>
         val executionId = 0
         val df = createTestDataFrame
    -    listener.onOtherEvent(SparkListenerSQLExecutionStart(
    +    bus.postToAll(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    listener.onJobStart(SparkListenerJobStart(
    +    bus.postToAll(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq.empty,
           createProperties(executionId)))
    -    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
    +    bus.postToAll(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    listener.onJobEnd(SparkListenerJobEnd(
    +    bus.postToAll(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobFailed(new RuntimeException("Oops"))
         ))
     
    -    val executionUIData = listener.executionIdToData(0)
    -    assert(executionUIData.runningJobs.isEmpty)
    -    assert(executionUIData.succeededJobs.isEmpty)
    -    assert(executionUIData.failedJobs === Seq(0))
    +    assertJobs(store.execution(0), failed = Seq(0))
       }
     
       test("SPARK-11126: no memory leak when running non SQL jobs") {
    -    val previousStageNumber = spark.sharedState.listener.stageIdToStageMetrics.size
    +    val previousStageNumber = statusStore.executionsList().size
         spark.sparkContext.parallelize(1 to 10).foreach(i => ())
         spark.sparkContext.listenerBus.waitUntilEmpty(10000)
         // listener should ignore the non SQL stage
    -    assert(spark.sharedState.listener.stageIdToStageMetrics.size == previousStageNumber)
    +    assert(statusStore.executionsList().size == previousStageNumber)
     
         spark.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
         spark.sparkContext.listenerBus.waitUntilEmpty(10000)
         // listener should save the SQL stage
    -    assert(spark.sharedState.listener.stageIdToStageMetrics.size == previousStageNumber + 1)
    -  }
    -
    -  test("SPARK-13055: history listener only tracks SQL metrics") {
    -    val listener = new SQLHistoryListener(sparkContext.conf, mock(classOf[SparkUI]))
    -    // We need to post other events for the listener to track our accumulators.
    -    // These are largely just boilerplate unrelated to what we're trying to test.
    -    val df = createTestDataFrame
    -    val executionStart = SparkListenerSQLExecutionStart(
    -      0, "", "", "", SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), 0)
    -    val stageInfo = createStageInfo(0, 0)
    -    val jobStart = SparkListenerJobStart(0, 0, Seq(stageInfo), createProperties(0))
    -    val stageSubmitted = SparkListenerStageSubmitted(stageInfo)
    -    // This task has both accumulators that are SQL metrics and accumulators that are not.
    -    // The listener should only track the ones that are actually SQL metrics.
    -    val sqlMetric = SQLMetrics.createMetric(sparkContext, "beach umbrella")
    -    val nonSqlMetric = sparkContext.longAccumulator("baseball")
    -    val sqlMetricInfo = sqlMetric.toInfo(Some(sqlMetric.value), None)
    -    val nonSqlMetricInfo = nonSqlMetric.toInfo(Some(nonSqlMetric.value), None)
    -    val taskInfo = createTaskInfo(0, 0)
    -    taskInfo.setAccumulables(List(sqlMetricInfo, nonSqlMetricInfo))
    -    val taskEnd = SparkListenerTaskEnd(0, 0, "just-a-task", null, taskInfo, null)
    -    listener.onOtherEvent(executionStart)
    -    listener.onJobStart(jobStart)
    -    listener.onStageSubmitted(stageSubmitted)
    -    // Before SPARK-13055, this throws ClassCastException because the history listener would
    -    // assume that the accumulator value is of type Long, but this may not be true for
    -    // accumulators that are not SQL metrics.
    -    listener.onTaskEnd(taskEnd)
    -    val trackedAccums = listener.stageIdToStageMetrics.values.flatMap { stageMetrics =>
    -      stageMetrics.taskIdToMetricUpdates.values.flatMap(_.accumulatorUpdates)
    -    }
    -    // Listener tracks only SQL metrics, not other accumulators
    -    assert(trackedAccums.size === 1)
    -    assert(trackedAccums.head === ((sqlMetricInfo.id, sqlMetricInfo.update.get)))
    +    assert(statusStore.executionsList().size == previousStageNumber + 1)
       }
     
       test("driver side SQL metrics") {
    -    val listener = new SQLListener(spark.sparkContext.conf)
    -    val expectedAccumValue = 12345
    +    val oldCount = statusStore.executionsList().size
    +    val expectedAccumValue = 12345L
         val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue)
    -    sqlContext.sparkContext.addSparkListener(listener)
    --- End diff --
    
    Does it really work? The listener is attached to a `ReplayBus`, not the real spark event bus.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    **[Test build #83658 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83658/testReport)** for PR 19681 at commit [`1a31665`](https://github.com/apache/spark/commit/1a31665ab6d3352dee3e15c87a697a7e655eb34c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r158326122
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala ---
    @@ -0,0 +1,366 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.ui
    +
    +import java.util.Date
    +import java.util.concurrent.ConcurrentHashMap
    +import java.util.function.Function
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.metric._
    +import org.apache.spark.status.LiveEntity
    +import org.apache.spark.status.config._
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.kvstore.KVStore
    +
    +private[sql] class SQLAppStatusListener(
    +    conf: SparkConf,
    +    kvstore: KVStore,
    +    live: Boolean,
    +    ui: Option[SparkUI] = None)
    +  extends SparkListener with Logging {
    +
    +  // How often to flush intermediate state of a live execution to the store. When replaying logs,
    +  // never flush (only do the very last write).
    +  private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
    +
    +  // Live tracked data is needed by the SQL status store to calculate metrics for in-flight
    +  // executions; that means arbitrary threads may be querying these maps, so they need to be
    +  // thread-safe.
    +  private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]()
    +  private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()
    +
    +  private var uiInitialized = false
    +
    +  override def onJobStart(event: SparkListenerJobStart): Unit = {
    +    val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
    +    if (executionIdString == null) {
    +      // This is not a job created by SQL
    +      return
    +    }
    +
    +    val executionId = executionIdString.toLong
    +    val jobId = event.jobId
    +    val exec = getOrCreateExecution(executionId)
    +
    +    // Record the accumulator IDs for the stages of this job, so that the code that keeps
    +    // track of the metrics knows which accumulators to look at.
    +    val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
    +    event.stageIds.foreach { id =>
    +      stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap()))
    +    }
    +
    +    exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
    +    exec.stages = event.stageIds.toSet
    --- End diff --
    
    oh good catch.  I can submit a fix for this


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r157016366
  
    --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
    @@ -455,9 +457,14 @@ class SparkContext(config: SparkConf) extends Logging {
             // For tests, do not enable the UI
             None
           }
    -    // Bind the UI before starting the task scheduler to communicate
    -    // the bound port to the cluster manager properly
    -    _ui.foreach(_.bind())
    +    _ui.foreach { ui =>
    +      // Load any plugins that might want to modify the UI.
    +      AppStatusPlugin.loadPlugins().foreach(_.setupUI(ui))
    --- End diff --
    
    Let's continue the discussion on the other PR.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    **[Test build #83525 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83525/testReport)** for PR 19681 at commit [`c4af460`](https://github.com/apache/spark/commit/c4af460bc995a5021a26edc5e9709d5d81f1c09b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r157130281
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala ---
    @@ -36,13 +36,14 @@ import org.apache.spark.sql.catalyst.util.quietly
     import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution}
     import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
     import org.apache.spark.sql.test.SharedSQLContext
    -import org.apache.spark.ui.SparkUI
    +import org.apache.spark.status.config._
     import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator}
    -
    +import org.apache.spark.util.kvstore.InMemoryStore
     
     class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils {
       import testImplicits._
    -  import org.apache.spark.AccumulatorSuite.makeInfo
    +
    +  override protected def sparkConf = super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
    --- End diff --
    
    the spark context is shared for all test suites, we should only set this conf to 0 in this suite.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    **[Test build #83570 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83570/testReport)** for PR 19681 at commit [`197dd8f`](https://github.com/apache/spark/commit/197dd8fe645d3672c6e0c0ac0f52144a84b91dc5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83585/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r157331270
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala ---
    @@ -36,13 +36,14 @@ import org.apache.spark.sql.catalyst.util.quietly
     import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution}
     import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
     import org.apache.spark.sql.test.SharedSQLContext
    -import org.apache.spark.ui.SparkUI
    +import org.apache.spark.status.config._
     import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator}
    -
    +import org.apache.spark.util.kvstore.InMemoryStore
     
     class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils {
       import testImplicits._
    -  import org.apache.spark.AccumulatorSuite.makeInfo
    +
    +  override protected def sparkConf = super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
    --- End diff --
    
    ah you are right, it's only shared in hive tests


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149458539
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala ---
    @@ -36,13 +36,14 @@ import org.apache.spark.sql.catalyst.util.quietly
     import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution}
     import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
     import org.apache.spark.sql.test.SharedSQLContext
    -import org.apache.spark.ui.SparkUI
    +import org.apache.spark.status.config._
     import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator}
    -
    +import org.apache.spark.util.kvstore.InMemoryStore
     
     class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils {
    --- End diff --
    
    rename test to `SQLAppStatusListenerSuite`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    **[Test build #83525 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83525/testReport)** for PR 19681 at commit [`c4af460`](https://github.com/apache/spark/commit/c4af460bc995a5021a26edc5e9709d5d81f1c09b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    lgtm
    
    need to fix the merge conflict though


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83609/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    **[Test build #83585 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83585/testReport)** for PR 19681 at commit [`ecf293b`](https://github.com/apache/spark/commit/ecf293b31fa1b5250f484d6b2f09373e7057bbc3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    **[Test build #83585 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83585/testReport)** for PR 19681 at commit [`ecf293b`](https://github.com/apache/spark/commit/ecf293b31fa1b5250f484d6b2f09373e7057bbc3).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    For context:
    
    * Project link: https://issues.apache.org/jira/browse/SPARK-18085
    * Upcoming PRs that build on this code: https://github.com/vanzin/spark/pulls
    
    Note I took this PR out of its original order in my repo to speed up reviews. That means it is not exactly the same code as in there - I'll need to clean up the `FsHistoryProvider` code in a separate change.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    **[Test build #83609 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83609/testReport)** for PR 19681 at commit [`a42f418`](https://github.com/apache/spark/commit/a42f418bc5a84260f8b02b6db4e03847c764f362).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149466906
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala ---
    @@ -36,13 +36,14 @@ import org.apache.spark.sql.catalyst.util.quietly
     import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution}
     import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
     import org.apache.spark.sql.test.SharedSQLContext
    -import org.apache.spark.ui.SparkUI
    +import org.apache.spark.status.config._
     import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator}
    -
    +import org.apache.spark.util.kvstore.InMemoryStore
     
     class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils {
    --- End diff --
    
    Actually this suite has a mix of tests for the listener and for stuff that's not related to the listener, which would belong in `SQLAppStatusListenerSuite`. My original changes broke this into two different suites, but I chose to postpone that to reduce the size of the diff for now (and also to make the diff a little easier to read).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83802/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    **[Test build #83620 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83620/testReport)** for PR 19681 at commit [`bb7388b`](https://github.com/apache/spark/commit/bb7388b86d7adf8bbf209cf7748c319c4b8c0c77).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    @vanzin merge conflict since I merged the Job & stage page change


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83570/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149578074
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala ---
    @@ -0,0 +1,353 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.ui
    +
    +import java.util.Date
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.metric._
    +import org.apache.spark.sql.internal.StaticSQLConf._
    +import org.apache.spark.status.LiveEntity
    +import org.apache.spark.status.config._
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.kvstore.KVStore
    +
    +private[sql] class SQLAppStatusListener(
    +    conf: SparkConf,
    +    kvstore: KVStore,
    +    live: Boolean,
    +    ui: Option[SparkUI] = None)
    +  extends SparkListener with Logging {
    +
    +  // How often to flush intermediate stage of a live execution to the store. When replaying logs,
    +  // never flush (only do the very last write).
    +  private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
    +
    +  private val liveExecutions = new HashMap[Long, LiveExecutionData]()
    +  private val stageMetrics = new HashMap[Int, LiveStageMetrics]()
    +
    +  private var uiInitialized = false
    +
    +  override def onJobStart(event: SparkListenerJobStart): Unit = {
    +    val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
    +    if (executionIdString == null) {
    +      // This is not a job created by SQL
    +      return
    +    }
    +
    +    val executionId = executionIdString.toLong
    +    val jobId = event.jobId
    +    val exec = getOrCreateExecution(executionId)
    +
    +    // Record the accumulator IDs for the stages of this job, so that the code that keeps
    +    // track of the metrics knows which accumulators to look at.
    +    val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
    +    event.stageIds.foreach { id =>
    +      stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap()))
    +    }
    +
    +    exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
    +    exec.stages = event.stageIds
    +    update(exec)
    +  }
    +
    +  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
    +    if (!isSQLStage(event.stageInfo.stageId)) {
    +      return
    +    }
    +
    +    // Reset the metrics tracking object for the new attempt.
    +    stageMetrics.get(event.stageInfo.stageId).foreach { metrics =>
    +      metrics.taskMetrics.clear()
    +      metrics.attemptId = event.stageInfo.attemptId
    +    }
    +  }
    +
    +  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
    +    liveExecutions.values.foreach { exec =>
    +      if (exec.jobs.contains(event.jobId)) {
    +        val result = event.jobResult match {
    +          case JobSucceeded => JobExecutionStatus.SUCCEEDED
    +          case _ => JobExecutionStatus.FAILED
    +        }
    +        exec.jobs = exec.jobs + (event.jobId -> result)
    +        exec.endEvents += 1
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
    +    event.accumUpdates.foreach { case (taskId, stageId, attemptId, accumUpdates) =>
    +      updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
    +    }
    +  }
    +
    +  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
    +    if (!isSQLStage(event.stageId)) {
    +      return
    +    }
    +
    +    val info = event.taskInfo
    +    // SPARK-20342. If processing events from a live application, use the task metrics info to
    +    // work around a race in the DAGScheduler. The metrics info does not contain accumulator info
    +    // when reading event logs in the SHS, so we have to rely on the accumulator in that case.
    +    val accums = if (live && event.taskMetrics != null) {
    +      event.taskMetrics.externalAccums.flatMap { a =>
    +        // This call may fail if the accumulator is gc'ed, so account for that.
    +        try {
    +          Some(a.toInfo(Some(a.value), None))
    +        } catch {
    +          case _: IllegalAccessError => None
    +        }
    +      }
    +    } else {
    +      info.accumulables
    +    }
    +    updateStageMetrics(event.stageId, event.stageAttemptId, info.taskId, accums,
    +      info.successful)
    +  }
    +
    +  def executionMetrics(executionId: Long): Map[Long, String] = synchronized {
    +    liveExecutions.get(executionId).map { exec =>
    +      if (exec.metricsValues != null) {
    +        exec.metricsValues
    +      } else {
    +        aggregateMetrics(exec)
    +      }
    +    }.getOrElse {
    +      throw new NoSuchElementException(s"execution $executionId not found")
    +    }
    +  }
    +
    +  private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = synchronized {
    +    val metricIds = exec.metrics.map(_.accumulatorId).sorted
    +    val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap
    +    val metrics = exec.stages
    +      .flatMap(stageMetrics.get)
    +      .flatMap(_.taskMetrics.values().asScala)
    +      .flatMap { metrics => metrics.ids.zip(metrics.values) }
    +
    +    (metrics ++ exec.driverAccumUpdates.toSeq)
    +      .filter { case (id, _) => metricIds.contains(id) }
    +      .groupBy(_._1)
    +      .map { case (id, values) =>
    +        id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq)
    +      }
    +  }
    +
    +  private def updateStageMetrics(
    +      stageId: Int,
    +      attemptId: Int,
    +      taskId: Long,
    +      accumUpdates: Seq[AccumulableInfo],
    +      succeeded: Boolean): Unit = {
    +    stageMetrics.get(stageId).foreach { metrics =>
    +      if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) {
    +        return
    +      }
    +
    +      val oldTaskMetrics = metrics.taskMetrics.get(taskId)
    +      if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
    +        return
    +      }
    +
    +      val updates = accumUpdates
    +        .filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) }
    +        .sortBy(_.id)
    +
    +      if (updates.isEmpty) {
    +        return
    +      }
    +
    +      val ids = new Array[Long](updates.size)
    +      val values = new Array[Long](updates.size)
    +      updates.zipWithIndex.foreach { case (acc, idx) =>
    +        ids(idx) = acc.id
    +        // In a live application, accumulators have Long values, but when reading from event
    +        // logs, they have String values. For now, assume all accumulators are Long and covert
    +        // accordingly.
    +        values(idx) = acc.update.get match {
    +          case s: String => s.toLong
    +          case l: Long => l
    +          case o => throw new IllegalArgumentException(s"Unexpected: $o")
    +        }
    +      }
    +
    +      metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded))
    +    }
    +  }
    +
    +  private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
    +    // Install the SQL tab in a live app if it hasn't been initialized yet.
    +    if (!uiInitialized) {
    +      ui.foreach { _ui =>
    +        new SQLTab(new SQLAppStatusStore(kvstore, Some(this)), _ui)
    +      }
    +      uiInitialized = true
    +    }
    +
    +    val SparkListenerSQLExecutionStart(executionId, description, details,
    +      physicalPlanDescription, sparkPlanInfo, time) = event
    +
    +    def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper] = {
    +      nodes.map {
    +        case cluster: SparkPlanGraphCluster =>
    +          val storedCluster = new SparkPlanGraphClusterWrapper(
    +            cluster.id,
    +            cluster.name,
    +            cluster.desc,
    +            toStoredNodes(cluster.nodes),
    +            cluster.metrics)
    +          new SparkPlanGraphNodeWrapper(null, storedCluster)
    +
    +        case node =>
    +          new SparkPlanGraphNodeWrapper(node, null)
    +      }
    +    }
    +
    +    val planGraph = SparkPlanGraph(sparkPlanInfo)
    +    val sqlPlanMetrics = planGraph.allNodes.flatMap { node =>
    +      node.metrics.map { metric => (metric.accumulatorId, metric) }
    +    }.toMap.values.toList
    +
    +    val graphToStore = new SparkPlanGraphWrapper(
    +      executionId,
    +      toStoredNodes(planGraph.nodes),
    +      planGraph.edges)
    +    kvstore.write(graphToStore)
    +
    +    val exec = getOrCreateExecution(executionId)
    +    exec.description = description
    +    exec.details = details
    +    exec.physicalPlanDescription = physicalPlanDescription
    +    exec.metrics = sqlPlanMetrics
    +    exec.submissionTime = time
    +    update(exec)
    +  }
    +
    +  private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
    +    val SparkListenerSQLExecutionEnd(executionId, time) = event
    +    liveExecutions.get(executionId).foreach { exec =>
    +      synchronized {
    +        exec.metricsValues = aggregateMetrics(exec)
    +
    +        // Remove stale LiveStageMetrics objects for stages that are not active anymore.
    +        val activeStages = liveExecutions.values.flatMap { other =>
    +          if (other != exec) other.stages else Nil
    +        }.toSet
    +        stageMetrics.retain { case (id, _) => activeStages.contains(id) }
    +
    +        exec.completionTime = Some(new Date(time))
    +        exec.endEvents += 1
    +
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  private def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): Unit = {
    +    val SparkListenerDriverAccumUpdates(executionId, accumUpdates) = event
    +    liveExecutions.get(executionId).foreach { exec =>
    +      exec.driverAccumUpdates = accumUpdates.toMap
    +      update(exec)
    +    }
    +  }
    +
    +  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
    +    case e: SparkListenerSQLExecutionStart => onExecutionStart(e)
    +    case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e)
    +    case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e)
    +    case _ => // Ignore
    +  }
    +
    +  private def getOrCreateExecution(executionId: Long): LiveExecutionData = {
    +    liveExecutions.getOrElseUpdate(executionId, new LiveExecutionData(executionId))
    +  }
    +
    +  private def update(exec: LiveExecutionData): Unit = {
    +    val now = System.nanoTime()
    +    if (exec.endEvents >= exec.jobs.size + 1) {
    +      liveExecutions.remove(exec.executionId)
    +      exec.write(kvstore, now)
    --- End diff --
    
    sorry, I meant if the _application_ is killed, so the event log just ends abruptly.  I think the old history server code would still show you the updated metrics for all the tasks that had completed.  But seems like after this change, the history server won't show anything for any jobs which hadn't completed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149466466
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala ---
    @@ -0,0 +1,353 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.ui
    +
    +import java.util.Date
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.metric._
    +import org.apache.spark.sql.internal.StaticSQLConf._
    +import org.apache.spark.status.LiveEntity
    +import org.apache.spark.status.config._
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.kvstore.KVStore
    +
    +private[sql] class SQLAppStatusListener(
    --- End diff --
    
    The live UI is updated from the info in `LiveStageMetrics`, which is not written to the store. That's kept in memory while executions are running, and aggregated into the final metrics view when the execution finishes (see `aggregateMetrics`).
    
    `SQLAppStatusStore.executionMetrics` has logic to call the listener directly when the final metrics are not yet computed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    >> Are there any tests for the SQL UI in the history server?
    
    >No, because those tests are in core. I want to add some SHS integration tests into a separate module after this works goes in, covering core, SQL and the disk store, so that we can detect incompatibilities when they happen. But haven't really written anything yet.
    
    OK -- is there a jira for adding those tests?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    **[Test build #83609 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83609/testReport)** for PR 19681 at commit [`a42f418`](https://github.com/apache/spark/commit/a42f418bc5a84260f8b02b6db4e03847c764f362).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r150091458
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala ---
    @@ -0,0 +1,172 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.ui
    +
    +import java.lang.{Long => JLong}
    +import java.util.Date
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ArrayBuffer
    +
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.scheduler.SparkListener
    +import org.apache.spark.status.AppStatusPlugin
    +import org.apache.spark.status.KVUtils.KVIndexParam
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.Utils
    +import org.apache.spark.util.kvstore.KVStore
    +
    +/**
    + * Provides a view of a KVStore with methods that make it easy to query SQL-specific state. There's
    + * no state kept in this class, so it's ok to have multiple instances of it in an application.
    + */
    +private[sql] class SQLAppStatusStore(
    +    store: KVStore,
    +    listener: Option[SQLAppStatusListener] = None) {
    +
    +  def executionsList(): Seq[SQLExecutionUIData] = {
    +    store.view(classOf[SQLExecutionUIData]).asScala.toSeq
    +  }
    +
    +  def execution(executionId: Long): Option[SQLExecutionUIData] = {
    +    try {
    +      Some(store.read(classOf[SQLExecutionUIData], executionId))
    +    } catch {
    +      case _: NoSuchElementException => None
    +    }
    +  }
    +
    +  def executionsCount(): Long = {
    +    store.count(classOf[SQLExecutionUIData])
    +  }
    +
    +  def executionMetrics(executionId: Long): Map[Long, String] = {
    +    val exec = store.read(classOf[SQLExecutionUIData], executionId)
    +    Option(exec.metricValues)
    +      .orElse(listener.map(_.executionMetrics(executionId)))
    +      .getOrElse(Map())
    --- End diff --
    
    is there a race here when the execution ends?
    
    - T1 (UI thread, calling this method): execution hasn't ended, so `exec.metricValues` is null
    - T2 (listener): execution ends, drops execution from `liveExecutions`
    - T1: `_.executionMetrics(executionId)` throws an exception because its dropped from the `liveExecutions`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by carsonwang <gi...@git.apache.org>.
Github user carsonwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r158226032
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala ---
    @@ -0,0 +1,366 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.ui
    +
    +import java.util.Date
    +import java.util.concurrent.ConcurrentHashMap
    +import java.util.function.Function
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.metric._
    +import org.apache.spark.status.LiveEntity
    +import org.apache.spark.status.config._
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.kvstore.KVStore
    +
    +private[sql] class SQLAppStatusListener(
    +    conf: SparkConf,
    +    kvstore: KVStore,
    +    live: Boolean,
    +    ui: Option[SparkUI] = None)
    +  extends SparkListener with Logging {
    +
    +  // How often to flush intermediate state of a live execution to the store. When replaying logs,
    +  // never flush (only do the very last write).
    +  private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
    +
    +  // Live tracked data is needed by the SQL status store to calculate metrics for in-flight
    +  // executions; that means arbitrary threads may be querying these maps, so they need to be
    +  // thread-safe.
    +  private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]()
    +  private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()
    +
    +  private var uiInitialized = false
    +
    +  override def onJobStart(event: SparkListenerJobStart): Unit = {
    +    val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
    +    if (executionIdString == null) {
    +      // This is not a job created by SQL
    +      return
    +    }
    +
    +    val executionId = executionIdString.toLong
    +    val jobId = event.jobId
    +    val exec = getOrCreateExecution(executionId)
    +
    +    // Record the accumulator IDs for the stages of this job, so that the code that keeps
    +    // track of the metrics knows which accumulators to look at.
    +    val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
    +    event.stageIds.foreach { id =>
    +      stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap()))
    +    }
    +
    +    exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
    +    exec.stages = event.stageIds.toSet
    --- End diff --
    
    @vanzin , shall we add the stageIds to the existing stageIds? Otherwise we will lose the stageIds in previous jobs?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149578181
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala ---
    @@ -40,7 +40,7 @@ private[sql] class SQLAppStatusListener(
         ui: Option[SparkUI] = None)
       extends SparkListener with Logging {
     
    -  // How often to flush intermediate statge of a live execution to the store. When replaying logs,
    +  // How often to flush intermediate stage of a live execution to the store. When replaying logs,
    --- End diff --
    
    err, was this supposed to be "state"?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r156987695
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.ui
    +
    +import java.lang.{Long => JLong}
    +import java.util.Date
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.ArrayBuffer
    +
    +import com.fasterxml.jackson.databind.annotation.JsonDeserialize
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.scheduler.SparkListener
    +import org.apache.spark.status.AppStatusPlugin
    +import org.apache.spark.status.KVUtils.KVIndexParam
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.Utils
    +import org.apache.spark.util.kvstore.KVStore
    +
    +/**
    + * Provides a view of a KVStore with methods that make it easy to query SQL-specific state. There's
    + * no state kept in this class, so it's ok to have multiple instances of it in an application.
    + */
    +private[sql] class SQLAppStatusStore(
    +    store: KVStore,
    +    listener: Option[SQLAppStatusListener] = None) {
    +
    +  def executionsList(): Seq[SQLExecutionUIData] = {
    +    store.view(classOf[SQLExecutionUIData]).asScala.toSeq
    +  }
    +
    +  def execution(executionId: Long): Option[SQLExecutionUIData] = {
    +    try {
    +      Some(store.read(classOf[SQLExecutionUIData], executionId))
    +    } catch {
    +      case _: NoSuchElementException => None
    +    }
    +  }
    +
    +  def executionsCount(): Long = {
    +    store.count(classOf[SQLExecutionUIData])
    +  }
    +
    +  def executionMetrics(executionId: Long): Map[Long, String] = {
    +    def metricsFromStore(): Option[Map[Long, String]] = {
    +      val exec = store.read(classOf[SQLExecutionUIData], executionId)
    +      Option(exec.metricValues)
    +    }
    +
    +    metricsFromStore()
    +      .orElse(listener.flatMap(_.liveExecutionMetrics(executionId)))
    +      // Try a second time in case the execution finished while this method is trying to
    +      // get the metrics.
    +      .orElse(metricsFromStore())
    +      .getOrElse(Map())
    +  }
    +
    +  def planGraph(executionId: Long): SparkPlanGraph = {
    +    store.read(classOf[SparkPlanGraphWrapper], executionId).toSparkPlanGraph()
    +  }
    +
    +}
    +
    +/**
    + * An AppStatusPlugin for handling the SQL UI and listeners.
    + */
    +private[sql] class SQLAppStatusPlugin extends AppStatusPlugin {
    +
    +  override def setupListeners(
    +      conf: SparkConf,
    +      store: KVStore,
    +      addListenerFn: SparkListener => Unit,
    +      live: Boolean): Unit = {
    +    // For live applications, the listener is installed in [[setupUI]]. This also avoids adding
    +    // the listener when the UI is disabled. Force installation during testing, though.
    +    if (!live || Utils.isTesting) {
    +      val listener = new SQLAppStatusListener(conf, store, live, None)
    +      addListenerFn(listener)
    +    }
    +  }
    +
    +  override def setupUI(ui: SparkUI): Unit = {
    --- End diff --
    
    Do we have a clear rule about when `setupListeners` is called and when `setupUI` is called?
    
    Here we register `SQLAppStatusListener` in both `setupListeners` and `setupUI`, will we register it twice?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149579972
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala ---
    @@ -0,0 +1,353 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.ui
    +
    +import java.util.Date
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.metric._
    +import org.apache.spark.sql.internal.StaticSQLConf._
    +import org.apache.spark.status.LiveEntity
    +import org.apache.spark.status.config._
    +import org.apache.spark.ui.SparkUI
    +import org.apache.spark.util.kvstore.KVStore
    +
    +private[sql] class SQLAppStatusListener(
    +    conf: SparkConf,
    +    kvstore: KVStore,
    +    live: Boolean,
    +    ui: Option[SparkUI] = None)
    +  extends SparkListener with Logging {
    +
    +  // How often to flush intermediate stage of a live execution to the store. When replaying logs,
    +  // never flush (only do the very last write).
    +  private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
    +
    +  private val liveExecutions = new HashMap[Long, LiveExecutionData]()
    +  private val stageMetrics = new HashMap[Int, LiveStageMetrics]()
    +
    +  private var uiInitialized = false
    +
    +  override def onJobStart(event: SparkListenerJobStart): Unit = {
    +    val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
    +    if (executionIdString == null) {
    +      // This is not a job created by SQL
    +      return
    +    }
    +
    +    val executionId = executionIdString.toLong
    +    val jobId = event.jobId
    +    val exec = getOrCreateExecution(executionId)
    +
    +    // Record the accumulator IDs for the stages of this job, so that the code that keeps
    +    // track of the metrics knows which accumulators to look at.
    +    val accumIds = exec.metrics.map(_.accumulatorId).sorted.toList
    +    event.stageIds.foreach { id =>
    +      stageMetrics.put(id, new LiveStageMetrics(id, 0, accumIds.toArray, new ConcurrentHashMap()))
    +    }
    +
    +    exec.jobs = exec.jobs + (jobId -> JobExecutionStatus.RUNNING)
    +    exec.stages = event.stageIds
    +    update(exec)
    +  }
    +
    +  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
    +    if (!isSQLStage(event.stageInfo.stageId)) {
    +      return
    +    }
    +
    +    // Reset the metrics tracking object for the new attempt.
    +    stageMetrics.get(event.stageInfo.stageId).foreach { metrics =>
    +      metrics.taskMetrics.clear()
    +      metrics.attemptId = event.stageInfo.attemptId
    +    }
    +  }
    +
    +  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
    +    liveExecutions.values.foreach { exec =>
    +      if (exec.jobs.contains(event.jobId)) {
    +        val result = event.jobResult match {
    +          case JobSucceeded => JobExecutionStatus.SUCCEEDED
    +          case _ => JobExecutionStatus.FAILED
    +        }
    +        exec.jobs = exec.jobs + (event.jobId -> result)
    +        exec.endEvents += 1
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
    +    event.accumUpdates.foreach { case (taskId, stageId, attemptId, accumUpdates) =>
    +      updateStageMetrics(stageId, attemptId, taskId, accumUpdates, false)
    +    }
    +  }
    +
    +  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
    +    if (!isSQLStage(event.stageId)) {
    +      return
    +    }
    +
    +    val info = event.taskInfo
    +    // SPARK-20342. If processing events from a live application, use the task metrics info to
    +    // work around a race in the DAGScheduler. The metrics info does not contain accumulator info
    +    // when reading event logs in the SHS, so we have to rely on the accumulator in that case.
    +    val accums = if (live && event.taskMetrics != null) {
    +      event.taskMetrics.externalAccums.flatMap { a =>
    +        // This call may fail if the accumulator is gc'ed, so account for that.
    +        try {
    +          Some(a.toInfo(Some(a.value), None))
    +        } catch {
    +          case _: IllegalAccessError => None
    +        }
    +      }
    +    } else {
    +      info.accumulables
    +    }
    +    updateStageMetrics(event.stageId, event.stageAttemptId, info.taskId, accums,
    +      info.successful)
    +  }
    +
    +  def executionMetrics(executionId: Long): Map[Long, String] = synchronized {
    +    liveExecutions.get(executionId).map { exec =>
    +      if (exec.metricsValues != null) {
    +        exec.metricsValues
    +      } else {
    +        aggregateMetrics(exec)
    +      }
    +    }.getOrElse {
    +      throw new NoSuchElementException(s"execution $executionId not found")
    +    }
    +  }
    +
    +  private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = synchronized {
    +    val metricIds = exec.metrics.map(_.accumulatorId).sorted
    +    val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) }.toMap
    +    val metrics = exec.stages
    +      .flatMap(stageMetrics.get)
    +      .flatMap(_.taskMetrics.values().asScala)
    +      .flatMap { metrics => metrics.ids.zip(metrics.values) }
    +
    +    (metrics ++ exec.driverAccumUpdates.toSeq)
    +      .filter { case (id, _) => metricIds.contains(id) }
    +      .groupBy(_._1)
    +      .map { case (id, values) =>
    +        id -> SQLMetrics.stringValue(metricTypes(id), values.map(_._2).toSeq)
    +      }
    +  }
    +
    +  private def updateStageMetrics(
    +      stageId: Int,
    +      attemptId: Int,
    +      taskId: Long,
    +      accumUpdates: Seq[AccumulableInfo],
    +      succeeded: Boolean): Unit = {
    +    stageMetrics.get(stageId).foreach { metrics =>
    +      if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) {
    +        return
    +      }
    +
    +      val oldTaskMetrics = metrics.taskMetrics.get(taskId)
    +      if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
    +        return
    +      }
    +
    +      val updates = accumUpdates
    +        .filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) }
    +        .sortBy(_.id)
    +
    +      if (updates.isEmpty) {
    +        return
    +      }
    +
    +      val ids = new Array[Long](updates.size)
    +      val values = new Array[Long](updates.size)
    +      updates.zipWithIndex.foreach { case (acc, idx) =>
    +        ids(idx) = acc.id
    +        // In a live application, accumulators have Long values, but when reading from event
    +        // logs, they have String values. For now, assume all accumulators are Long and covert
    +        // accordingly.
    +        values(idx) = acc.update.get match {
    +          case s: String => s.toLong
    +          case l: Long => l
    +          case o => throw new IllegalArgumentException(s"Unexpected: $o")
    +        }
    +      }
    +
    +      metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded))
    +    }
    +  }
    +
    +  private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
    +    // Install the SQL tab in a live app if it hasn't been initialized yet.
    +    if (!uiInitialized) {
    +      ui.foreach { _ui =>
    +        new SQLTab(new SQLAppStatusStore(kvstore, Some(this)), _ui)
    +      }
    +      uiInitialized = true
    +    }
    +
    +    val SparkListenerSQLExecutionStart(executionId, description, details,
    +      physicalPlanDescription, sparkPlanInfo, time) = event
    +
    +    def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper] = {
    +      nodes.map {
    +        case cluster: SparkPlanGraphCluster =>
    +          val storedCluster = new SparkPlanGraphClusterWrapper(
    +            cluster.id,
    +            cluster.name,
    +            cluster.desc,
    +            toStoredNodes(cluster.nodes),
    +            cluster.metrics)
    +          new SparkPlanGraphNodeWrapper(null, storedCluster)
    +
    +        case node =>
    +          new SparkPlanGraphNodeWrapper(node, null)
    +      }
    +    }
    +
    +    val planGraph = SparkPlanGraph(sparkPlanInfo)
    +    val sqlPlanMetrics = planGraph.allNodes.flatMap { node =>
    +      node.metrics.map { metric => (metric.accumulatorId, metric) }
    +    }.toMap.values.toList
    +
    +    val graphToStore = new SparkPlanGraphWrapper(
    +      executionId,
    +      toStoredNodes(planGraph.nodes),
    +      planGraph.edges)
    +    kvstore.write(graphToStore)
    +
    +    val exec = getOrCreateExecution(executionId)
    +    exec.description = description
    +    exec.details = details
    +    exec.physicalPlanDescription = physicalPlanDescription
    +    exec.metrics = sqlPlanMetrics
    +    exec.submissionTime = time
    +    update(exec)
    +  }
    +
    +  private def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
    +    val SparkListenerSQLExecutionEnd(executionId, time) = event
    +    liveExecutions.get(executionId).foreach { exec =>
    +      synchronized {
    +        exec.metricsValues = aggregateMetrics(exec)
    +
    +        // Remove stale LiveStageMetrics objects for stages that are not active anymore.
    +        val activeStages = liveExecutions.values.flatMap { other =>
    +          if (other != exec) other.stages else Nil
    +        }.toSet
    +        stageMetrics.retain { case (id, _) => activeStages.contains(id) }
    +
    +        exec.completionTime = Some(new Date(time))
    +        exec.endEvents += 1
    +
    +        update(exec)
    +      }
    +    }
    +  }
    +
    +  private def onDriverAccumUpdates(event: SparkListenerDriverAccumUpdates): Unit = {
    +    val SparkListenerDriverAccumUpdates(executionId, accumUpdates) = event
    +    liveExecutions.get(executionId).foreach { exec =>
    +      exec.driverAccumUpdates = accumUpdates.toMap
    +      update(exec)
    +    }
    +  }
    +
    +  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
    +    case e: SparkListenerSQLExecutionStart => onExecutionStart(e)
    +    case e: SparkListenerSQLExecutionEnd => onExecutionEnd(e)
    +    case e: SparkListenerDriverAccumUpdates => onDriverAccumUpdates(e)
    +    case _ => // Ignore
    +  }
    +
    +  private def getOrCreateExecution(executionId: Long): LiveExecutionData = {
    +    liveExecutions.getOrElseUpdate(executionId, new LiveExecutionData(executionId))
    +  }
    +
    +  private def update(exec: LiveExecutionData): Unit = {
    +    val now = System.nanoTime()
    +    if (exec.endEvents >= exec.jobs.size + 1) {
    +      liveExecutions.remove(exec.executionId)
    +      exec.write(kvstore, now)
    --- End diff --
    
    That's true; I've actually fixed that in the M6 code:
    https://github.com/vanzin/spark/pull/51/files#diff-a74d84702d8d47d5269e96740a55a3caR56
    
    It's not very easy to fix here without writing throw-away code to propagate some "close()" call to this listener.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149778794
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala ---
    @@ -89,398 +83,3 @@ private class LongLongTupleConverter extends Converter[(Object, Object), (Long,
         typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], Array(longType, longType))
       }
     }
    -
    -class SQLHistoryListenerFactory extends SparkHistoryListenerFactory {
    -
    -  override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = {
    -    List(new SQLHistoryListener(conf, sparkUI))
    -  }
    -}
    -
    -class SQLListener(conf: SparkConf) extends SparkListener with Logging {
    -
    -  private val retainedExecutions = conf.getInt("spark.sql.ui.retainedExecutions", 1000)
    -
    -  private val activeExecutions = mutable.HashMap[Long, SQLExecutionUIData]()
    -
    -  // Old data in the following fields must be removed in "trimExecutionsIfNecessary".
    -  // If adding new fields, make sure "trimExecutionsIfNecessary" can clean up old data
    -  private val _executionIdToData = mutable.HashMap[Long, SQLExecutionUIData]()
    -
    -  /**
    -   * Maintain the relation between job id and execution id so that we can get the execution id in
    -   * the "onJobEnd" method.
    -   */
    -  private val _jobIdToExecutionId = mutable.HashMap[Long, Long]()
    -
    -  private val _stageIdToStageMetrics = mutable.HashMap[Long, SQLStageMetrics]()
    -
    -  private val failedExecutions = mutable.ListBuffer[SQLExecutionUIData]()
    -
    -  private val completedExecutions = mutable.ListBuffer[SQLExecutionUIData]()
    -
    -  def executionIdToData: Map[Long, SQLExecutionUIData] = synchronized {
    -    _executionIdToData.toMap
    -  }
    -
    -  def jobIdToExecutionId: Map[Long, Long] = synchronized {
    -    _jobIdToExecutionId.toMap
    -  }
    -
    -  def stageIdToStageMetrics: Map[Long, SQLStageMetrics] = synchronized {
    -    _stageIdToStageMetrics.toMap
    -  }
    -
    -  private def trimExecutionsIfNecessary(
    -      executions: mutable.ListBuffer[SQLExecutionUIData]): Unit = {
    -    if (executions.size > retainedExecutions) {
    -      val toRemove = math.max(retainedExecutions / 10, 1)
    -      executions.take(toRemove).foreach { execution =>
    -        for (executionUIData <- _executionIdToData.remove(execution.executionId)) {
    -          for (jobId <- executionUIData.jobs.keys) {
    -            _jobIdToExecutionId.remove(jobId)
    -          }
    -          for (stageId <- executionUIData.stages) {
    -            _stageIdToStageMetrics.remove(stageId)
    -          }
    -        }
    -      }
    -      executions.trimStart(toRemove)
    -    }
    -  }
    -
    -  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
    -    val executionIdString = jobStart.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
    -    if (executionIdString == null) {
    -      // This is not a job created by SQL
    -      return
    -    }
    -    val executionId = executionIdString.toLong
    -    val jobId = jobStart.jobId
    -    val stageIds = jobStart.stageIds
    -
    -    synchronized {
    -      activeExecutions.get(executionId).foreach { executionUIData =>
    -        executionUIData.jobs(jobId) = JobExecutionStatus.RUNNING
    -        executionUIData.stages ++= stageIds
    -        stageIds.foreach(stageId =>
    -          _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId = 0))
    -        _jobIdToExecutionId(jobId) = executionId
    -      }
    -    }
    -  }
    -
    -  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
    -    val jobId = jobEnd.jobId
    -    for (executionId <- _jobIdToExecutionId.get(jobId);
    -         executionUIData <- _executionIdToData.get(executionId)) {
    -      jobEnd.jobResult match {
    -        case JobSucceeded => executionUIData.jobs(jobId) = JobExecutionStatus.SUCCEEDED
    -        case JobFailed(_) => executionUIData.jobs(jobId) = JobExecutionStatus.FAILED
    -      }
    -      if (executionUIData.completionTime.nonEmpty && !executionUIData.hasRunningJobs) {
    -        // We are the last job of this execution, so mark the execution as finished. Note that
    -        // `onExecutionEnd` also does this, but currently that can be called before `onJobEnd`
    -        // since these are called on different threads.
    -        markExecutionFinished(executionId)
    -      }
    -    }
    -  }
    -
    -  override def onExecutorMetricsUpdate(
    -      executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = synchronized {
    -    for ((taskId, stageId, stageAttemptID, accumUpdates) <- executorMetricsUpdate.accumUpdates) {
    -      updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, accumUpdates, finishTask = false)
    -    }
    -  }
    -
    -  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
    -    val stageId = stageSubmitted.stageInfo.stageId
    -    val stageAttemptId = stageSubmitted.stageInfo.attemptId
    -    // Always override metrics for old stage attempt
    -    if (_stageIdToStageMetrics.contains(stageId)) {
    -      _stageIdToStageMetrics(stageId) = new SQLStageMetrics(stageAttemptId)
    -    } else {
    -      // If a stage belongs to some SQL execution, its stageId will be put in "onJobStart".
    -      // Since "_stageIdToStageMetrics" doesn't contain it, it must not belong to any SQL execution.
    -      // So we can ignore it. Otherwise, this may lead to memory leaks (SPARK-11126).
    -    }
    -  }
    -
    -  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
    -    if (taskEnd.taskMetrics != null) {
    -      updateTaskAccumulatorValues(
    -        taskEnd.taskInfo.taskId,
    -        taskEnd.stageId,
    -        taskEnd.stageAttemptId,
    -        taskEnd.taskMetrics.externalAccums.map(a => a.toInfo(Some(a.value), None)),
    -        finishTask = true)
    -    }
    -  }
    -
    -  /**
    -   * Update the accumulator values of a task with the latest metrics for this task. This is called
    -   * every time we receive an executor heartbeat or when a task finishes.
    -   */
    -  protected def updateTaskAccumulatorValues(
    -      taskId: Long,
    -      stageId: Int,
    -      stageAttemptID: Int,
    -      _accumulatorUpdates: Seq[AccumulableInfo],
    -      finishTask: Boolean): Unit = {
    -    val accumulatorUpdates =
    -      _accumulatorUpdates.filter(_.update.isDefined).map(accum => (accum.id, accum.update.get))
    -
    -    _stageIdToStageMetrics.get(stageId) match {
    -      case Some(stageMetrics) =>
    -        if (stageAttemptID < stageMetrics.stageAttemptId) {
    -          // A task of an old stage attempt. Because a new stage is submitted, we can ignore it.
    -        } else if (stageAttemptID > stageMetrics.stageAttemptId) {
    -          logWarning(s"A task should not have a higher stageAttemptID ($stageAttemptID) then " +
    -            s"what we have seen (${stageMetrics.stageAttemptId})")
    -        } else {
    -          // TODO We don't know the attemptId. Currently, what we can do is overriding the
    --- End diff --
    
    this comment was hard to make sense of (you shouldn't ever have two tasks with the same taskId, even with speculation), but I think there is something here which may still be worth mentioning.  You aggregate metrics across all attempts for a given task (aka "index"), even speculative ones (before and after your change) -- I'd mention that in a comment.
    
    (The index is available in onTaskStart / End if we wanted to de-duplicate.)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    **[Test build #83603 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83603/testReport)** for PR 19681 at commit [`ecf293b`](https://github.com/apache/spark/commit/ecf293b31fa1b5250f484d6b2f09373e7057bbc3).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    **[Test build #83567 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83567/testReport)** for PR 19681 at commit [`46695ed`](https://github.com/apache/spark/commit/46695edece81057986e7a0b7f87a0eacbe0b0fca).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/83521/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    **[Test build #83521 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/83521/testReport)** for PR 19681 at commit [`ccd5adc`](https://github.com/apache/spark/commit/ccd5adc1d6273b92fd6c9a0d4817451a5acb566a).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #19681: [SPARK-20652][sql] Store SQL UI data in the new a...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19681#discussion_r149767574
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala ---
    @@ -0,0 +1,353 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.sql.execution.ui
    +
    +import java.util.Date
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable.HashMap
    +
    +import org.apache.spark.{JobExecutionStatus, SparkConf}
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler._
    +import org.apache.spark.sql.execution.SQLExecution
    +import org.apache.spark.sql.execution.metric._
    +import org.apache.spark.sql.internal.StaticSQLConf._
    --- End diff --
    
    unused import


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #19681: [SPARK-20652][sql] Store SQL UI data in the new app stat...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/19681
  
    > Are there any tests for the SQL UI in the history server?
    
    No, because those tests are in `core`. I want to add some SHS integration tests into a separate module after this works goes in, covering core, SQL and the disk store, so that we can detect incompatibilities when they happen. But haven't really written anything yet.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org