You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sunil Rangwani (JIRA)" <ji...@apache.org> on 2017/01/01 18:20:58 UTC

[jira] [Commented] (SPARK-17463) Serialization of accumulators in heartbeats is not thread-safe

    [ https://issues.apache.org/jira/browse/SPARK-17463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15791469#comment-15791469 ] 

Sunil Rangwani commented on SPARK-17463:
----------------------------------------

Hi [~zsxwing]

Below is how I am using the Collection accumulator. 

{code}
// val spark: SparkSession
// val dataFrame: DataFrame
val updatedRecordKeysAcc = spark.sparkContext.collectionAccumulator[String]

dataFrame.toJSON.foreach(processRecord(_, updatedRecordKeysAcc))

import collection.JavaConverters._
val updatedRecordKeys: String = updatedRecordKeysAcc.value.asScala.mkString("\n")
// write `updatedRecordKeys` to an S3 file using aws-sdk... 

def processRecord(recordJSON: String, updatedRecordKeysAcc: CollectionAccumulator) {
	// do Stuff with recordJSON... 
	// val recordKey: String = getKeyFromJSON(recordJSON)
	updatedRecordKeysAcc.add(recordKey)
}

{code}

It works when I am working with smaller datasets of 10s of 1000s but when I try to run it with a dataset of 10s of millions, it fails with the exception above. I tried to scale up the cluster to 5 to 6 nodes but it still gives the same error and the cluster remains underutilized.  
It also works alright when I comment out the line {noformat}updatedRecordKeysAcc.add(recordKey){noformat}

> Serialization of accumulators in heartbeats is not thread-safe
> --------------------------------------------------------------
>
>                 Key: SPARK-17463
>                 URL: https://issues.apache.org/jira/browse/SPARK-17463
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.0.0
>            Reporter: Josh Rosen
>            Assignee: Shixiong Zhu
>            Priority: Critical
>             Fix For: 2.0.1, 2.1.0
>
>
> Check out the following {{ConcurrentModificationException}}:
> {code}
> 16/09/06 16:10:29 WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(2,[Lscala.Tuple2;@66e7b6e7,BlockManagerId(2, HOST, 57743))] in 1 attempts
> org.apache.spark.SparkException: Exception thrown in awaitResult
>     at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
>     at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
>     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
>     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>     at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
>     at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
>     at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)
>     at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)
>     at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
>     at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
>     at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1862)
>     at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.ConcurrentModificationException
>     at java.util.ArrayList.writeObject(ArrayList.java:766)
>     at sun.reflect.GeneratedMethodAccessor20.invoke(Unknown Source)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:497)
>     at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
>     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
>     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
>     at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
>     at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>     at org.apache.spark.rpc.netty.NettyRpcEnv.serialize(NettyRpcEnv.scala:253)
>     at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:227)
>     at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:508)
>     at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
>     ... 13 more
> {code}
> Even though accumulators aren't thread-safe they can be concurrently read while serializing executor heartbeats and modified while tasks are running, leading to ConcurrentModificationException errors (thereby leading to missing heartbeats) or leading to inconsistent data (since individual fields of a multi-field object might be serialized at different points in time, leading to inconsistencies in accumulators like LongAccum).
> This seems like a pretty serious issue but I'm not sure what's the best way to fix this. An obvious fix would be to properly synchronize all accesses to the fields of our accumulators and to synchronize the writeObject and writeKryo methods, but this may have an adverse performance impact



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org