You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by ShaneTian <ti...@ict.ac.cn> on 2016/12/22 02:36:10 UTC

Got an Accumulator error after adding some task metrics in Spark 2.0.2

Hello all

I've tried to add some task metrics in
org.apache.spark.executor.ShuffleReadMetrics.scala in Spark 2.0.2, following
the format of other existing metrics, but when submitting applications, I
got these errors:

ERROR TaskSetManager: Failed to serialize task 0, not attempting to retry
it.
java.lang.UnsupportedOperationException: Accumulator must be registered
before send to executor
   at
org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:158)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:498)
   at
java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1118)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
   at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
   at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
   at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
   at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
   at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
   at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
   at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
   at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
   at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
   at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
   at
org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:231)
   at
org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:458)
   ...
16/12/21 16:16:42 ERROR TaskSchedulerImpl: Resource offer failed, task set
TaskSet_0 was not serializable
Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Failed to serialize task 0, not attempting to retry it.
Exception during serialization: java.lang.UnsupportedOperationException:
Accumulator must be registered before send to executor
   at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
   at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
   at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
   at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
   at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
   at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
   at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
   at scala.Option.foreach(Option.scala:257)
   at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
   at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
   at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
   at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
 ...
16/12/21 16:16:42 ERROR LiveListenerBus: SparkListenerBus has already
stopped! Dropping event    
SparkListenerBlockManagerAdded(1482308202401,BlockManagerId(7, 172.18.11.3,
42715),12409896960)
16/12/21 16:16:42 ERROR LiveListenerBus: SparkListenerBus has already
stopped! Dropping event    
SparkListenerBlockManagerAdded(1482308202445,BlockManagerId(5,
172.18.11.121, 41654),12409896960)

It seems like the Accumulator of task metrics has not been registered before
being used, but I also added the new metrics in the nameToAccums map in
TaskMetrics.scala:

private[spark] lazy val nameToAccums = LinkedHashMap(
  ...
  // add by txh
  shuffleRead.SHUFFLE_READ_TIME -> shuffleReadMetrics._shuffleReadTime,
  shuffleRead.SHUFFLE_MERGE_TIME -> shuffleReadMetrics._shuffleMergeTime,
  shuffleRead.SHUFFLE_MERGE_MEMORY ->
shuffleReadMetrics._shuffleMergeMemory,
  shuffleRead.SHUFFLE_USE_MEMORY -> shuffleReadMetrics._shuffleUseMemory,
  ...
)

What else should I add to make these new metrics be registered? Thanks very
much
ShuffleReadMetrics.scala
<http://apache-spark-developers-list.1001551.n3.nabble.com/file/n20330/ShuffleReadMetrics.scala>  



--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Got-an-Accumulator-error-after-adding-some-task-metrics-in-Spark-2-0-2-tp20330.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscribe@spark.apache.org