You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Adrian Tanase (JIRA)" <ji...@apache.org> on 2015/05/26 14:56:18 UTC

[jira] [Commented] (SPARK-5206) Accumulators are not re-registered during recovering from checkpoint

    [ https://issues.apache.org/jira/browse/SPARK-5206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14559071#comment-14559071 ] 

Adrian Tanase commented on SPARK-5206:
--------------------------------------

[~tdas] - are there any plans to make accumulators re-register automatically at checkpoint restore? I think this would be a valuable fix even if the values are reset to "zero".
For example, we're using them to aggregate some counters that are pushed to OpenTSDB after every micro-batch, after which they are re-set to the initial value. (to avoid rated metrics in OpenTSDB and keep the counters additive)

I implemented the above suggestion and it works - but it also has a number of downsides that make it impractical for a codebase the relies heavily on metrics and counters implemented as accumulators:
- it seems to limit usage to either output operations or transformed {{DStreams}}
- if prevents using accumulators during transformations (e.g. simple like {{map}} or stateful like {{updateStateByKey}})
- one has to wrap all the DStream transformations in {{foreachRDD}} or {{transform}}, altering the semantics of the streaming job heavily and obscuring the business logic - something that goes against Spark's mantra for ease of use

Is there another way that I'm missing in the above pattern, that would simplify the implementation by only calling {{getInstance}} once per streaming context?

Thanks!

> Accumulators are not re-registered during recovering from checkpoint
> --------------------------------------------------------------------
>
>                 Key: SPARK-5206
>                 URL: https://issues.apache.org/jira/browse/SPARK-5206
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.1.0
>            Reporter: vincent ye
>
> I got exception as following while my streaming application restarts from crash from checkpoit:
> 15/01/12 10:31:06 sparkDriver-akka.actor.default-dispatcher-4 ERROR scheduler.DAGScheduler: Failed to update accumulators for ShuffleMapTask(41, 4)
> java.util.NoSuchElementException: key not found: 1
> 	at scala.collection.MapLike$class.default(MapLike.scala:228)
> 	at scala.collection.AbstractMap.default(Map.scala:58)
> 	at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:939)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskCompletion$1.apply(DAGScheduler.scala:938)
> 	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> 	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> 	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> 	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> 	at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> 	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1388)
> 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> 	at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> 	at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> I guess that an Accumulator is registered to a singleton Accumulators in Line 58 of org.apache.spark.Accumulable:
> Accumulators.register(this, true)
> This code need to be executed in the driver once. But when the application is recovered from checkpoint. It won't be executed in the driver. So when the driver process it at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:938), It can't find the Accumulator because it's not re-register during the recovery.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org