You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (JIRA)" <ji...@apache.org> on 2014/10/06 21:29:34 UTC

[jira] [Commented] (SPARK-2546) Configuration object thread safety issue

    [ https://issues.apache.org/jira/browse/SPARK-2546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14160790#comment-14160790 ] 

Josh Rosen commented on SPARK-2546:
-----------------------------------

I've decided to go with the cloning approach, since this seems simplest and safest.

It looks like SparkContext has a public {{hadoopConfiguration}} {{val}} that holds a re-used Configuration object.  It looks like this may have been purposely exposed to allow users to set Hadoop configuration properties (see how it's mentioned in docs/storage-openstack-swift.md; the Spark EC2 instructions also mention using this attribute to set S3 credentials).  This object is used as the default Hadoop configuration in the {{newAPIHadoopRDD}} and {{saveAsHadoop*}} methods; it's also read in many other places inside of Spark.

While SPARK-2585 addressed sharing of the Configuration objects in executors, it seems that we still might face races in the driver if multiple threads are sharing a SparkContext and one thread mutates the shared configuration while another thread submits a job that reads it.

This seems like a tricky problem to fix.  I don't think that we can change {{SparkContext.hadoopConfiguration}} to return a copy of the configuration object, since it seems that the shared / mutating semantics are required by some existing code.  At the same time, we can't simply clone the return value before using it in our internal driver-side code since a) we can't lock out writers/mutators while performing the clone() and b) the change in semantics might break existing user-code.  Essentially, I don't think that there's anything that we can do that's guaranteed to be safe once a Configuration has been exposed to multiple threads; we need to perform the cloning before the object has been shared.

> Configuration object thread safety issue
> ----------------------------------------
>
>                 Key: SPARK-2546
>                 URL: https://issues.apache.org/jira/browse/SPARK-2546
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 0.9.1
>            Reporter: Andrew Ash
>            Assignee: Josh Rosen
>            Priority: Critical
>
> // observed in 0.9.1 but expected to exist in 1.0.1 as well
> This ticket is copy-pasted from a thread on the dev@ list:
> {quote}
> We discovered a very interesting bug in Spark at work last week in Spark 0.9.1 — that the way Spark uses the Hadoop Configuration object is prone to thread safety issues.  I believe it still applies in Spark 1.0.1 as well.  Let me explain:
> Observations
>  - Was running a relatively simple job (read from Avro files, do a map, do another map, write back to Avro files)
>  - 412 of 413 tasks completed, but the last task was hung in RUNNING state
>  - The 412 successful tasks completed in median time 3.4s
>  - The last hung task didn't finish even in 20 hours
>  - The executor with the hung task was responsible for 100% of one core of CPU usage
>  - Jstack of the executor attached (relevant thread pasted below)
> Diagnosis
> After doing some code spelunking, we determined the issue was concurrent use of a Configuration object for each task on an executor.  In Hadoop each task runs in its own JVM, but in Spark multiple tasks can run in the same JVM, so the single-threaded access assumptions of the Configuration object no longer hold in Spark.
> The specific issue is that the AvroRecordReader actually _modifies_ the JobConf it's given when it's instantiated!  It adds a key for the RPC protocol engine in the process of connecting to the Hadoop FileSystem.  When many tasks start at the same time (like at the start of a job), many tasks are adding this configuration item to the one Configuration object at once.  Internally Configuration uses a java.lang.HashMap, which isn't threadsafe… The below post is an excellent explanation of what happens in the situation where multiple threads insert into a HashMap at the same time.
> http://mailinator.blogspot.com/2009/06/beautiful-race-condition.html
> The gist is that you have a thread following a cycle of linked list nodes indefinitely.  This exactly matches our observations of the 100% CPU core and also the final location in the stack trace.
> So it seems the way Spark shares a Configuration object between task threads in an executor is incorrect.  We need some way to prevent concurrent access to a single Configuration object.
> Proposed fix
> We can clone the JobConf object in HadoopRDD.getJobConf() so each task gets its own JobConf object (and thus Configuration object).  The optimization of broadcasting the Configuration object across the cluster can remain, but on the other side I think it needs to be cloned for each task to allow for concurrent access.  I'm not sure the performance implications, but the comments suggest that the Configuration object is ~10KB so I would expect a clone on the object to be relatively speedy.
> Has this been observed before?  Does my suggested fix make sense?  I'd be happy to file a Jira ticket and continue discussion there for the right way to fix.
> Thanks!
> Andrew
> P.S.  For others seeing this issue, our temporary workaround is to enable spark.speculation, which retries failed (or hung) tasks on other machines.
> {noformat}
> "Executor task launch worker-6" daemon prio=10 tid=0x00007f91f01fe000 nid=0x54b1 runnable [0x00007f92d74f1000]
>    java.lang.Thread.State: RUNNABLE
>     at java.util.HashMap.transfer(HashMap.java:601)
>     at java.util.HashMap.resize(HashMap.java:581)
>     at java.util.HashMap.addEntry(HashMap.java:879)
>     at java.util.HashMap.put(HashMap.java:505)
>     at org.apache.hadoop.conf.Configuration.set(Configuration.java:803)
>     at org.apache.hadoop.conf.Configuration.set(Configuration.java:783)
>     at org.apache.hadoop.conf.Configuration.setClass(Configuration.java:1662)
>     at org.apache.hadoop.ipc.RPC.setProtocolEngine(RPC.java:193)
>     at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:343)
>     at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:168)
>     at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:129)
>     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:436)
>     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:403)
>     at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:125)
>     at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2262)
>     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:86)
>     at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2296)
>     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2278)
>     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:316)
>     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:194)
>     at org.apache.avro.mapred.FsInput.<init>(FsInput.java:37)
>     at org.apache.avro.mapred.AvroRecordReader.<init>(AvroRecordReader.java:43)
>     at org.apache.avro.mapred.AvroInputFormat.getRecordReader(AvroInputFormat.java:52)
>     at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:156)
>     at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
>     at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
>     at org.apache.spark.scheduler.Task.run(Task.scala:53)
>     at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
>     at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>     at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:415)
>     at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
>     at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
> {noformat}
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org