You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "DENG FEI (JIRA)" <ji...@apache.org> on 2018/09/14 07:48:00 UTC
[jira] [Updated] (SPARK-25429) SparkListenerBus inefficient due to
'LiveStageMetrics#accumulatorIds:Array[Long]' data structure
[ https://issues.apache.org/jira/browse/SPARK-25429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
DENG FEI updated SPARK-25429:
-----------------------------
Description:
{code:java}
private def updateStageMetrics(
stageId: Int,
attemptId: Int,
taskId: Long,
accumUpdates: Seq[AccumulableInfo],
succeeded: Boolean): Unit = {
Option(stageMetrics.get(stageId)).foreach { metrics =>
if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) {
return
}
val oldTaskMetrics = metrics.taskMetrics.get(taskId)
if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
return
}
val updates = accumUpdates
.filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) }
.sortBy(_.id)
if (updates.isEmpty) {
return
}
val ids = new Array[Long](updates.size)
val values = new Array[Long](updates.size)
updates.zipWithIndex.foreach { case (acc, idx) =>
ids(idx) = acc.id
// In a live application, accumulators have Long values, but when reading from event
// logs, they have String values. For now, assume all accumulators are Long and covert
// accordingly.
values(idx) = acc.update.get match {
case s: String => s.toLong
case l: Long => l
case o => throw new IllegalArgumentException(s"Unexpected: $o")
}
}
// TODO: storing metrics by task ID can cause metrics for the same task index to be
// counted multiple times, for example due to speculation or re-attempts.
metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded))
}
}
{code}
'metrics.accumulatorIds.contains(acc.id)', if large SQL application generated many accumulator,
> SparkListenerBus inefficient due to 'LiveStageMetrics#accumulatorIds:Array[Long]' data structure
> ------------------------------------------------------------------------------------------------
>
> Key: SPARK-25429
> URL: https://issues.apache.org/jira/browse/SPARK-25429
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 2.3.1
> Reporter: DENG FEI
> Priority: Major
>
> {code:java}
> private def updateStageMetrics(
> stageId: Int,
> attemptId: Int,
> taskId: Long,
> accumUpdates: Seq[AccumulableInfo],
> succeeded: Boolean): Unit = {
> Option(stageMetrics.get(stageId)).foreach { metrics =>
> if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) {
> return
> }
> val oldTaskMetrics = metrics.taskMetrics.get(taskId)
> if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
> return
> }
> val updates = accumUpdates
> .filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) }
> .sortBy(_.id)
> if (updates.isEmpty) {
> return
> }
> val ids = new Array[Long](updates.size)
> val values = new Array[Long](updates.size)
> updates.zipWithIndex.foreach { case (acc, idx) =>
> ids(idx) = acc.id
> // In a live application, accumulators have Long values, but when reading from event
> // logs, they have String values. For now, assume all accumulators are Long and covert
> // accordingly.
> values(idx) = acc.update.get match {
> case s: String => s.toLong
> case l: Long => l
> case o => throw new IllegalArgumentException(s"Unexpected: $o")
> }
> }
> // TODO: storing metrics by task ID can cause metrics for the same task index to be
> // counted multiple times, for example due to speculation or re-attempts.
> metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded))
> }
> }
> {code}
> 'metrics.accumulatorIds.contains(acc.id)', if large SQL application generated many accumulator,
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org