You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "DENG FEI (JIRA)" <ji...@apache.org> on 2018/09/14 07:48:00 UTC

[jira] [Updated] (SPARK-25429) SparkListenerBus inefficient due to 'LiveStageMetrics#accumulatorIds:Array[Long]' data structure

     [ https://issues.apache.org/jira/browse/SPARK-25429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

DENG FEI updated SPARK-25429:
-----------------------------
    Description: 
{code:java}
private def updateStageMetrics(
      stageId: Int,
      attemptId: Int,
      taskId: Long,
      accumUpdates: Seq[AccumulableInfo],
      succeeded: Boolean): Unit = {
    Option(stageMetrics.get(stageId)).foreach { metrics =>
      if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) {
        return
      }

      val oldTaskMetrics = metrics.taskMetrics.get(taskId)
      if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
        return
      }

      val updates = accumUpdates
        .filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) }
        .sortBy(_.id)

      if (updates.isEmpty) {
        return
      }

      val ids = new Array[Long](updates.size)
      val values = new Array[Long](updates.size)
      updates.zipWithIndex.foreach { case (acc, idx) =>
        ids(idx) = acc.id
        // In a live application, accumulators have Long values, but when reading from event
        // logs, they have String values. For now, assume all accumulators are Long and covert
        // accordingly.
        values(idx) = acc.update.get match {
          case s: String => s.toLong
          case l: Long => l
          case o => throw new IllegalArgumentException(s"Unexpected: $o")
        }
      }

      // TODO: storing metrics by task ID can cause metrics for the same task index to be
      // counted multiple times, for example due to speculation or re-attempts.
      metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded))
    }
  }
{code}

'metrics.accumulatorIds.contains(acc.id)', if large SQL application generated many accumulator, 



> SparkListenerBus inefficient due to 'LiveStageMetrics#accumulatorIds:Array[Long]' data structure
> ------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-25429
>                 URL: https://issues.apache.org/jira/browse/SPARK-25429
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 2.3.1
>            Reporter: DENG FEI
>            Priority: Major
>
> {code:java}
> private def updateStageMetrics(
>       stageId: Int,
>       attemptId: Int,
>       taskId: Long,
>       accumUpdates: Seq[AccumulableInfo],
>       succeeded: Boolean): Unit = {
>     Option(stageMetrics.get(stageId)).foreach { metrics =>
>       if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) {
>         return
>       }
>       val oldTaskMetrics = metrics.taskMetrics.get(taskId)
>       if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
>         return
>       }
>       val updates = accumUpdates
>         .filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) }
>         .sortBy(_.id)
>       if (updates.isEmpty) {
>         return
>       }
>       val ids = new Array[Long](updates.size)
>       val values = new Array[Long](updates.size)
>       updates.zipWithIndex.foreach { case (acc, idx) =>
>         ids(idx) = acc.id
>         // In a live application, accumulators have Long values, but when reading from event
>         // logs, they have String values. For now, assume all accumulators are Long and covert
>         // accordingly.
>         values(idx) = acc.update.get match {
>           case s: String => s.toLong
>           case l: Long => l
>           case o => throw new IllegalArgumentException(s"Unexpected: $o")
>         }
>       }
>       // TODO: storing metrics by task ID can cause metrics for the same task index to be
>       // counted multiple times, for example due to speculation or re-attempts.
>       metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded))
>     }
>   }
> {code}
> 'metrics.accumulatorIds.contains(acc.id)', if large SQL application generated many accumulator, 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org