You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Harish (JIRA)" <ji...@apache.org> on 2016/11/08 04:26:58 UTC

[jira] [Commented] (SPARK-17463) Serialization of accumulators in heartbeats is not thread-safe

    [ https://issues.apache.org/jira/browse/SPARK-17463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15646453#comment-15646453 ] 

Harish commented on SPARK-17463:
--------------------------------

I was able to figure out the issue, its  not related to this bug. 

> Serialization of accumulators in heartbeats is not thread-safe
> --------------------------------------------------------------
>
>                 Key: SPARK-17463
>                 URL: https://issues.apache.org/jira/browse/SPARK-17463
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.0.0
>            Reporter: Josh Rosen
>            Assignee: Shixiong Zhu
>            Priority: Critical
>             Fix For: 2.0.1, 2.1.0
>
>
> Check out the following {{ConcurrentModificationException}}:
> {code}
> 16/09/06 16:10:29 WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(2,[Lscala.Tuple2;@66e7b6e7,BlockManagerId(2, HOST, 57743))] in 1 attempts
> org.apache.spark.SparkException: Exception thrown in awaitResult
>     at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
>     at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
>     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
>     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>     at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
>     at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
>     at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)
>     at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)
>     at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
>     at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
>     at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1862)
>     at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.ConcurrentModificationException
>     at java.util.ArrayList.writeObject(ArrayList.java:766)
>     at sun.reflect.GeneratedMethodAccessor20.invoke(Unknown Source)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:497)
>     at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>     at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
>     at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>     at org.apache.spark.rpc.netty.NettyRpcEnv.serialize(NettyRpcEnv.scala:253)
>     at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:227)
>     at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:508)
>     at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
>     ... 13 more
> {code}
> Even though accumulators aren't thread-safe they can be concurrently read while serializing executor heartbeats and modified while tasks are running, leading to ConcurrentModificationException errors (thereby leading to missing heartbeats) or leading to inconsistent data (since individual fields of a multi-field object might be serialized at different points in time, leading to inconsistencies in accumulators like LongAccum).
> This seems like a pretty serious issue but I'm not sure what's the best way to fix this. An obvious fix would be to properly synchronize all accesses to the fields of our accumulators and to synchronize the writeObject and writeKryo methods, but this may have an adverse performance impact



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org