You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sunil Rangwani (JIRA)" <ji...@apache.org> on 2016/12/30 14:52:58 UTC

[jira] [Commented] (SPARK-17463) Serialization of accumulators in heartbeats is not thread-safe

    [ https://issues.apache.org/jira/browse/SPARK-17463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15787809#comment-15787809 ] 

Sunil Rangwani commented on SPARK-17463:
----------------------------------------

Hi 
I get the same error with Spark 2.0.2 on EMR 5.2.0
I am using the CollectionAccumulator and I get the same stack trace. Hard to say from the stack trace what is the root cause. 

{noformat}
16/12/30 13:49:11 WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@63f6e6d6,BlockManagerId(1, <Hostname snipped>, 45163))] in 3 attempts
org.apache.spark.SparkException: Exception thrown in awaitResult
	at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
	at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
	at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
	at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)
	at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)
	at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
	at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1953)
	at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.ConcurrentModificationException
	at java.util.ArrayList.writeObject(ArrayList.java:766)
	at sun.reflect.GeneratedMethodAccessor55.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.defaultWriteObject(ObjectOutputStream.java:441)
	at java.util.Collections$SynchronizedCollection.writeObject(Collections.java:2081)
	at sun.reflect.GeneratedMethodAccessor54.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
	at org.apache.spark.rpc.netty.NettyRpcEnv.serialize(NettyRpcEnv.scala:253)
	at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:227)
	at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:508)
	at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
	... 13 more
{noformat}

> Serialization of accumulators in heartbeats is not thread-safe
> --------------------------------------------------------------
>
>                 Key: SPARK-17463
>                 URL: https://issues.apache.org/jira/browse/SPARK-17463
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.0.0
>            Reporter: Josh Rosen
>            Assignee: Shixiong Zhu
>            Priority: Critical
>             Fix For: 2.0.1, 2.1.0
>
>
> Check out the following {{ConcurrentModificationException}}:
> {code}
> 16/09/06 16:10:29 WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(2,[Lscala.Tuple2;@66e7b6e7,BlockManagerId(2, HOST, 57743))] in 1 attempts
> org.apache.spark.SparkException: Exception thrown in awaitResult
>     at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
>     at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
>     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
>     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>     at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
>     at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
>     at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)
>     at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)
>     at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
>     at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
>     at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1862)
>     at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.ConcurrentModificationException
>     at java.util.ArrayList.writeObject(ArrayList.java:766)
>     at sun.reflect.GeneratedMethodAccessor20.invoke(Unknown Source)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:497)
>     at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>     at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
>     at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>     at org.apache.spark.rpc.netty.NettyRpcEnv.serialize(NettyRpcEnv.scala:253)
>     at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:227)
>     at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:508)
>     at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
>     ... 13 more
> {code}
> Even though accumulators aren't thread-safe they can be concurrently read while serializing executor heartbeats and modified while tasks are running, leading to ConcurrentModificationException errors (thereby leading to missing heartbeats) or leading to inconsistent data (since individual fields of a multi-field object might be serialized at different points in time, leading to inconsistencies in accumulators like LongAccum).
> This seems like a pretty serious issue but I'm not sure what's the best way to fix this. An obvious fix would be to properly synchronize all accesses to the fields of our accumulators and to synchronize the writeObject and writeKryo methods, but this may have an adverse performance impact



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org