You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by B Li <on...@gmail.com> on 2017/07/10 19:56:18 UTC

Runtime exception with AccumulatorV2 on Spark 2.2/2.1.1

Hi community,

I'm using a custom AccumulatorV2 with java api:

MyAccumulatorV2 accum = new MyAccumulatorV2();
jsc.sc().register(accum, "MyAccumulator");


I got a runtime exception in an executor (in standalone cluster mode):

java.lang.UnsupportedOperationException: Accumulator must be registered
before send to executor
	at
org.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:164)
	at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at
java.io.ObjectStreamClass.invokeWriteReplace(ObjectStreamClass.java:1118)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1136)
	at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
	at
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:372)
	at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1055)
	at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
	at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
	at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)



This seems weird since I have registered the accumulator. After digging into
the problem, I found that in readObject() of
org.apache.spark.util.AccumulatorV2, 'atDriverSide' was set to true in an
executor while 'isRegistered' was left false. (atDriverSide=true,
isRegistered=false) caused the exception when writeReplace() of
org.apache.spark.util.AccumulatorV2 was called later. From the naming of
'atDriverSide', it seems the value should never be true in an executor. The
following is the stack when 'atDriverSide' was set to true in readObject():

	at org.apache.spark.util.AccumulatorV2.readObject(AccumulatorV2.scala:187)
	at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
	at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
	at
org.apache.spark.serializer.DeserializationStream.readValue(Serializer.scala:158)
	at
org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:188)
	at
org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:185)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1793)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
	at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1158)
	at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
	at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2062)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)

The stack in another case where the same exception was triggered:

	at org.apache.spark.util.AccumulatorV2.readObject(AccumulatorV2.scala:187)
	at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
	at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)
	at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
	at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
	at
org.apache.spark.serializer.DeserializationStream.readValue(Serializer.scala:158)
	at
org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:188)
	at
org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:185)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at
org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:371)
	at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1055)
	at
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1029)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:969)
	at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1029)
	at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:760)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)


Has anyone encountered the same problem? Did I do anything wrong or does it
seem like a bug in Spark? Thanks.




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Runtime-exception-with-AccumulatorV2-on-Spark-2-2-2-1-1-tp28840.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org