You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Andrew Ash <an...@andrewash.com> on 2014/07/15 07:22:53 UTC

Hadoop's Configuration object isn't threadsafe

Hi Spark devs,

We discovered a very interesting bug in Spark at work last week in Spark
0.9.1 — that the way Spark uses the Hadoop Configuration object is prone to
thread safety issues.  I believe it still applies in Spark 1.0.1 as well.
 Let me explain:


*Observations*

   - Was running a relatively simple job (read from Avro files, do a map,
   do another map, write back to Avro files)
   - 412 of 413 tasks completed, but the last task was hung in RUNNING state
   - The 412 successful tasks completed in median time 3.4s
   - The last hung task didn't finish even in 20 hours
   - The executor with the hung task was responsible for 100% of one core
   of CPU usage
   - Jstack of the executor attached (relevant thread pasted below)


*Diagnosis*

After doing some code spelunking, we determined the issue was concurrent
use of a Configuration object for each task on an executor.  In Hadoop each
task runs in its own JVM, but in Spark multiple tasks can run in the same
JVM, so the single-threaded access assumptions of the Configuration object
no longer hold in Spark.

The specific issue is that the AvroRecordReader actually _modifies_ the
JobConf it's given when it's instantiated!  It adds a key for the RPC
protocol engine in the process of connecting to the Hadoop FileSystem.
 When many tasks start at the same time (like at the start of a job), many
tasks are adding this configuration item to the one Configuration object at
once.  Internally Configuration uses a java.lang.HashMap, which isn't
threadsafe… The below post is an excellent explanation of what happens in
the situation where multiple threads insert into a HashMap at the same time.

http://mailinator.blogspot.com/2009/06/beautiful-race-condition.html

The gist is that you have a thread following a cycle of linked list nodes
indefinitely.  This exactly matches our observations of the 100% CPU core
and also the final location in the stack trace.

So it seems the way Spark shares a Configuration object between task
threads in an executor is incorrect.  We need some way to prevent
concurrent access to a single Configuration object.


*Proposed fix*

We can clone the JobConf object in HadoopRDD.getJobConf() so each task gets
its own JobConf object (and thus Configuration object).  The optimization
of broadcasting the Configuration object across the cluster can remain, but
on the other side I think it needs to be cloned for each task to allow for
concurrent access.  I'm not sure the performance implications, but the
comments suggest that the Configuration object is ~10KB so I would expect a
clone on the object to be relatively speedy.

Has this been observed before?  Does my suggested fix make sense?  I'd be
happy to file a Jira ticket and continue discussion there for the right way
to fix.


Thanks!
Andrew


P.S.  For others seeing this issue, our temporary workaround is to enable
spark.speculation, which retries failed (or hung) tasks on other machines.



"Executor task launch worker-6" daemon prio=10 tid=0x00007f91f01fe000
nid=0x54b1 runnable [0x00007f92d74f1000]
   java.lang.Thread.State: RUNNABLE
    at java.util.HashMap.transfer(HashMap.java:601)
    at java.util.HashMap.resize(HashMap.java:581)
    at java.util.HashMap.addEntry(HashMap.java:879)
    at java.util.HashMap.put(HashMap.java:505)
    at org.apache.hadoop.conf.Configuration.set(Configuration.java:803)
    at org.apache.hadoop.conf.Configuration.set(Configuration.java:783)
    at
org.apache.hadoop.conf.Configuration.setClass(Configuration.java:1662)
    at org.apache.hadoop.ipc.RPC.setProtocolEngine(RPC.java:193)
    at
org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:343)
    at
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:168)
    at
org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:129)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:436)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:403)
    at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:125)
    at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2262)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:86)
    at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2296)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2278)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:316)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:194)
    at org.apache.avro.mapred.FsInput.<init>(FsInput.java:37)
    at
org.apache.avro.mapred.AvroRecordReader.<init>(AvroRecordReader.java:43)
    at
org.apache.avro.mapred.AvroInputFormat.getRecordReader(AvroInputFormat.java:52)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:156)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
    at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
    at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
    at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

Re: Hadoop's Configuration object isn't threadsafe

Posted by Andrew Ash <an...@andrewash.com>.
Sounds good -- I added comments to the ticket.

Since SPARK-2521 is scheduled for a 1.1.0 release and we can work around
with spark.speculation, I don't personally see a need for a 1.0.2 backport.

Thanks looking through this issue!


On Thu, Jul 17, 2014 at 2:14 AM, Patrick Wendell <pw...@gmail.com> wrote:

> Hey Andrew,
>
> I think you are correct and a follow up to SPARK-2521 will end up
> fixing this. The desing of SPARK-2521 automatically broadcasts RDD
> data in tasks and the approach creates a new copy of the RDD and
> associated data for each task. A natural follow-up to that patch is to
> stop handling the jobConf separately (since we will now broadcast all
> referents of the RDD itself) and just have it broadcasted with the
> RDD. I'm not sure if Reynold plans to include this in SPARK-2521 or
> afterwards, but it's likely we'd do that soon.
>
> - Patrick
>
> On Wed, Jul 16, 2014 at 10:24 PM, Andrew Ash <an...@andrewash.com> wrote:
> > Hi Patrick, thanks for taking a look.  I filed as
> > https://issues.apache.org/jira/browse/SPARK-2546
> >
> > Would you recommend I pursue the cloned Configuration object approach now
> > and send in a PR?
> >
> > Reynold's recent announcement of the broadcast RDD object patch may also
> > have implications of the right path forward here.  I'm not sure I fully
> > understand the implications though:
> > https://github.com/apache/spark/pull/1452
> >
> > "Once this is committed, we can also remove the JobConf broadcast in
> > HadoopRDD."
> >
> > Thanks!
> > Andrew
> >
> >
> > On Tue, Jul 15, 2014 at 5:20 PM, Patrick Wendell <pw...@gmail.com>
> wrote:
> >
> >> Hey Andrew,
> >>
> >> Cloning the conf this might be a good/simple fix for this particular
> >> problem. It's definitely worth looking into.
> >>
> >> There are a few things we can probably do in Spark to deal with
> >> non-thread-safety inside of the Hadoop FileSystem and Configuration
> >> classes. One thing we can do in general is to add barriers around the
> >> locations where we knowingly access Hadoop FileSystem and
> >> Configuration state from multiple threads (e.g. during our own calls
> >> to getRecordReader in this case). But this will only deal with "writer
> >> writer" conflicts where we had multiple calls mutating the same object
> >> at the same time. It won't deal with "reader writer" conflicts where
> >> some of our initialization code touches state that is needed during
> >> normal execution of other tasks.
> >>
> >> - Patrick
> >>
> >> On Tue, Jul 15, 2014 at 12:56 PM, Andrew Ash <an...@andrewash.com>
> wrote:
> >> > Hi Shengzhe,
> >> >
> >> > Even if we did make Configuration threadsafe, it'd take quite some
> time
> >> for
> >> > that to trickle down to a Hadoop release that we could actually rely
> on
> >> > Spark users having installed.  I agree we should consider whether
> making
> >> > Configuration threadsafe is something that Hadoop should do, but for
> the
> >> > short term I think Spark needs to be able to handle the common
> scenario
> >> of
> >> > Configuration being single-threaded.
> >> >
> >> > Thanks!
> >> > Andrew
> >> >
> >> >
> >> > On Tue, Jul 15, 2014 at 2:43 PM, yao <ya...@gmail.com> wrote:
> >> >
> >> >> Good catch Andrew. In addition to your proposed solution, is that
> >> possible
> >> >> to fix Configuration class and make it thread-safe ? I think the fix
> >> should
> >> >> be trivial, just use a ConcurrentHashMap, but I am not sure if we can
> >> push
> >> >> this change upstream (will hadoop guys accept this change ? for
> them, it
> >> >> seems they never expect Configuration object being accessed by
> multiple
> >> >> threads).
> >> >>
> >> >> -Shengzhe
> >> >>
> >> >>
> >> >> On Mon, Jul 14, 2014 at 10:22 PM, Andrew Ash <an...@andrewash.com>
> >> wrote:
> >> >>
> >> >> > Hi Spark devs,
> >> >> >
> >> >> > We discovered a very interesting bug in Spark at work last week in
> >> Spark
> >> >> > 0.9.1 -- that the way Spark uses the Hadoop Configuration object is
> >> prone
> >> >> to
> >> >> > thread safety issues.  I believe it still applies in Spark 1.0.1 as
> >> well.
> >> >> >  Let me explain:
> >> >> >
> >> >> >
> >> >> > *Observations*
> >> >> >
> >> >> >    - Was running a relatively simple job (read from Avro files, do
> a
> >> map,
> >> >> >    do another map, write back to Avro files)
> >> >> >    - 412 of 413 tasks completed, but the last task was hung in
> RUNNING
> >> >> >    state
> >> >> >    - The 412 successful tasks completed in median time 3.4s
> >> >> >    - The last hung task didn't finish even in 20 hours
> >> >> >    - The executor with the hung task was responsible for 100% of
> one
> >> core
> >> >> >    of CPU usage
> >> >> >    - Jstack of the executor attached (relevant thread pasted below)
> >> >> >
> >> >> >
> >> >> > *Diagnosis*
> >> >> >
> >> >> > After doing some code spelunking, we determined the issue was
> >> concurrent
> >> >> > use of a Configuration object for each task on an executor.  In
> Hadoop
> >> >> each
> >> >> > task runs in its own JVM, but in Spark multiple tasks can run in
> the
> >> same
> >> >> > JVM, so the single-threaded access assumptions of the Configuration
> >> >> object
> >> >> > no longer hold in Spark.
> >> >> >
> >> >> > The specific issue is that the AvroRecordReader actually _modifies_
> >> the
> >> >> > JobConf it's given when it's instantiated!  It adds a key for the
> RPC
> >> >> > protocol engine in the process of connecting to the Hadoop
> FileSystem.
> >> >> >  When many tasks start at the same time (like at the start of a
> job),
> >> >> many
> >> >> > tasks are adding this configuration item to the one Configuration
> >> object
> >> >> at
> >> >> > once.  Internally Configuration uses a java.lang.HashMap, which
> isn't
> >> >> > threadsafe... The below post is an excellent explanation of what
> >> happens in
> >> >> > the situation where multiple threads insert into a HashMap at the
> same
> >> >> time.
> >> >> >
> >> >> >
> http://mailinator.blogspot.com/2009/06/beautiful-race-condition.html
> >> >> >
> >> >> > The gist is that you have a thread following a cycle of linked list
> >> nodes
> >> >> > indefinitely.  This exactly matches our observations of the 100%
> CPU
> >> core
> >> >> > and also the final location in the stack trace.
> >> >> >
> >> >> > So it seems the way Spark shares a Configuration object between
> task
> >> >> > threads in an executor is incorrect.  We need some way to prevent
> >> >> > concurrent access to a single Configuration object.
> >> >> >
> >> >> >
> >> >> > *Proposed fix*
> >> >> >
> >> >> > We can clone the JobConf object in HadoopRDD.getJobConf() so each
> task
> >> >> > gets its own JobConf object (and thus Configuration object).  The
> >> >> > optimization of broadcasting the Configuration object across the
> >> cluster
> >> >> > can remain, but on the other side I think it needs to be cloned for
> >> each
> >> >> > task to allow for concurrent access.  I'm not sure the performance
> >> >> > implications, but the comments suggest that the Configuration
> object
> >> is
> >> >> > ~10KB so I would expect a clone on the object to be relatively
> speedy.
> >> >> >
> >> >> > Has this been observed before?  Does my suggested fix make sense?
> >>  I'd be
> >> >> > happy to file a Jira ticket and continue discussion there for the
> >> right
> >> >> way
> >> >> > to fix.
> >> >> >
> >> >> >
> >> >> > Thanks!
> >> >> > Andrew
> >> >> >
> >> >> >
> >> >> > P.S.  For others seeing this issue, our temporary workaround is to
> >> enable
> >> >> > spark.speculation, which retries failed (or hung) tasks on other
> >> >> machines.
> >> >> >
> >> >> >
> >> >> >
> >> >> > "Executor task launch worker-6" daemon prio=10
> tid=0x00007f91f01fe000
> >> >> > nid=0x54b1 runnable [0x00007f92d74f1000]
> >> >> >    java.lang.Thread.State: RUNNABLE
> >> >> >     at java.util.HashMap.transfer(HashMap.java:601)
> >> >> >     at java.util.HashMap.resize(HashMap.java:581)
> >> >> >     at java.util.HashMap.addEntry(HashMap.java:879)
> >> >> >     at java.util.HashMap.put(HashMap.java:505)
> >> >> >     at
> >> org.apache.hadoop.conf.Configuration.set(Configuration.java:803)
> >> >> >     at
> >> org.apache.hadoop.conf.Configuration.set(Configuration.java:783)
> >> >> >     at
> >> >> >
> org.apache.hadoop.conf.Configuration.setClass(Configuration.java:1662)
> >> >> >     at org.apache.hadoop.ipc.RPC.setProtocolEngine(RPC.java:193)
> >> >> >     at
> >> >> >
> >> >>
> >>
> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:343)
> >> >> >     at
> >> >> >
> >> >>
> >>
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:168)
> >> >> >     at
> >> >> >
> >> >>
> >>
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:129)
> >> >> >     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:436)
> >> >> >     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:403)
> >> >> >     at
> >> >> >
> >> >>
> >>
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:125)
> >> >> >     at
> >> >> >
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2262)
> >> >> >     at
> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:86)
> >> >> >     at
> >> >> >
> >> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2296)
> >> >> >     at
> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2278)
> >> >> >     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:316)
> >> >> >     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:194)
> >> >> >     at org.apache.avro.mapred.FsInput.<init>(FsInput.java:37)
> >> >> >     at
> >> >> >
> >> org.apache.avro.mapred.AvroRecordReader.<init>(AvroRecordReader.java:43)
> >> >> >     at
> >> >> >
> >> >>
> >>
> org.apache.avro.mapred.AvroInputFormat.getRecordReader(AvroInputFormat.java:52)
> >> >> >     at
> >> org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:156)
> >> >> >     at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
> >> >> >     at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
> >> >> >     at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> >> >> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> >> >> >     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> >> >> >     at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> >> >> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> >> >> >     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> >> >> >     at
> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> >> >> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> >> >> >     at
> >> >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
> >> >> >     at org.apache.spark.scheduler.Task.run(Task.scala:53)
> >> >> >     at
> >> >> >
> >> >>
> >>
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
> >> >> >     at
> >> >> >
> >> >>
> >>
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
> >> >> >     at
> >> >> >
> >> >>
> >>
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
> >> >> >     at java.security.AccessController.doPrivileged(Native Method)
> >> >> >     at javax.security.auth.Subject.doAs(Subject.java:415)
> >> >> >     at
> >> >> >
> >> >>
> >>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
> >> >> >     at
> >> >> >
> >> >>
> >>
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
> >> >> >     at
> >> >> >
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
> >> >> >     at
> >> >> >
> >> >>
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >> >> >     at
> >> >> >
> >> >>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >> >> >     at java.lang.Thread.run(Thread.java:745)
> >> >> >
> >> >> >
> >> >>
> >>
>

Re: Hadoop's Configuration object isn't threadsafe

Posted by Patrick Wendell <pw...@gmail.com>.
Hey Andrew,

I think you are correct and a follow up to SPARK-2521 will end up
fixing this. The desing of SPARK-2521 automatically broadcasts RDD
data in tasks and the approach creates a new copy of the RDD and
associated data for each task. A natural follow-up to that patch is to
stop handling the jobConf separately (since we will now broadcast all
referents of the RDD itself) and just have it broadcasted with the
RDD. I'm not sure if Reynold plans to include this in SPARK-2521 or
afterwards, but it's likely we'd do that soon.

- Patrick

On Wed, Jul 16, 2014 at 10:24 PM, Andrew Ash <an...@andrewash.com> wrote:
> Hi Patrick, thanks for taking a look.  I filed as
> https://issues.apache.org/jira/browse/SPARK-2546
>
> Would you recommend I pursue the cloned Configuration object approach now
> and send in a PR?
>
> Reynold's recent announcement of the broadcast RDD object patch may also
> have implications of the right path forward here.  I'm not sure I fully
> understand the implications though:
> https://github.com/apache/spark/pull/1452
>
> "Once this is committed, we can also remove the JobConf broadcast in
> HadoopRDD."
>
> Thanks!
> Andrew
>
>
> On Tue, Jul 15, 2014 at 5:20 PM, Patrick Wendell <pw...@gmail.com> wrote:
>
>> Hey Andrew,
>>
>> Cloning the conf this might be a good/simple fix for this particular
>> problem. It's definitely worth looking into.
>>
>> There are a few things we can probably do in Spark to deal with
>> non-thread-safety inside of the Hadoop FileSystem and Configuration
>> classes. One thing we can do in general is to add barriers around the
>> locations where we knowingly access Hadoop FileSystem and
>> Configuration state from multiple threads (e.g. during our own calls
>> to getRecordReader in this case). But this will only deal with "writer
>> writer" conflicts where we had multiple calls mutating the same object
>> at the same time. It won't deal with "reader writer" conflicts where
>> some of our initialization code touches state that is needed during
>> normal execution of other tasks.
>>
>> - Patrick
>>
>> On Tue, Jul 15, 2014 at 12:56 PM, Andrew Ash <an...@andrewash.com> wrote:
>> > Hi Shengzhe,
>> >
>> > Even if we did make Configuration threadsafe, it'd take quite some time
>> for
>> > that to trickle down to a Hadoop release that we could actually rely on
>> > Spark users having installed.  I agree we should consider whether making
>> > Configuration threadsafe is something that Hadoop should do, but for the
>> > short term I think Spark needs to be able to handle the common scenario
>> of
>> > Configuration being single-threaded.
>> >
>> > Thanks!
>> > Andrew
>> >
>> >
>> > On Tue, Jul 15, 2014 at 2:43 PM, yao <ya...@gmail.com> wrote:
>> >
>> >> Good catch Andrew. In addition to your proposed solution, is that
>> possible
>> >> to fix Configuration class and make it thread-safe ? I think the fix
>> should
>> >> be trivial, just use a ConcurrentHashMap, but I am not sure if we can
>> push
>> >> this change upstream (will hadoop guys accept this change ? for them, it
>> >> seems they never expect Configuration object being accessed by multiple
>> >> threads).
>> >>
>> >> -Shengzhe
>> >>
>> >>
>> >> On Mon, Jul 14, 2014 at 10:22 PM, Andrew Ash <an...@andrewash.com>
>> wrote:
>> >>
>> >> > Hi Spark devs,
>> >> >
>> >> > We discovered a very interesting bug in Spark at work last week in
>> Spark
>> >> > 0.9.1 -- that the way Spark uses the Hadoop Configuration object is
>> prone
>> >> to
>> >> > thread safety issues.  I believe it still applies in Spark 1.0.1 as
>> well.
>> >> >  Let me explain:
>> >> >
>> >> >
>> >> > *Observations*
>> >> >
>> >> >    - Was running a relatively simple job (read from Avro files, do a
>> map,
>> >> >    do another map, write back to Avro files)
>> >> >    - 412 of 413 tasks completed, but the last task was hung in RUNNING
>> >> >    state
>> >> >    - The 412 successful tasks completed in median time 3.4s
>> >> >    - The last hung task didn't finish even in 20 hours
>> >> >    - The executor with the hung task was responsible for 100% of one
>> core
>> >> >    of CPU usage
>> >> >    - Jstack of the executor attached (relevant thread pasted below)
>> >> >
>> >> >
>> >> > *Diagnosis*
>> >> >
>> >> > After doing some code spelunking, we determined the issue was
>> concurrent
>> >> > use of a Configuration object for each task on an executor.  In Hadoop
>> >> each
>> >> > task runs in its own JVM, but in Spark multiple tasks can run in the
>> same
>> >> > JVM, so the single-threaded access assumptions of the Configuration
>> >> object
>> >> > no longer hold in Spark.
>> >> >
>> >> > The specific issue is that the AvroRecordReader actually _modifies_
>> the
>> >> > JobConf it's given when it's instantiated!  It adds a key for the RPC
>> >> > protocol engine in the process of connecting to the Hadoop FileSystem.
>> >> >  When many tasks start at the same time (like at the start of a job),
>> >> many
>> >> > tasks are adding this configuration item to the one Configuration
>> object
>> >> at
>> >> > once.  Internally Configuration uses a java.lang.HashMap, which isn't
>> >> > threadsafe... The below post is an excellent explanation of what
>> happens in
>> >> > the situation where multiple threads insert into a HashMap at the same
>> >> time.
>> >> >
>> >> > http://mailinator.blogspot.com/2009/06/beautiful-race-condition.html
>> >> >
>> >> > The gist is that you have a thread following a cycle of linked list
>> nodes
>> >> > indefinitely.  This exactly matches our observations of the 100% CPU
>> core
>> >> > and also the final location in the stack trace.
>> >> >
>> >> > So it seems the way Spark shares a Configuration object between task
>> >> > threads in an executor is incorrect.  We need some way to prevent
>> >> > concurrent access to a single Configuration object.
>> >> >
>> >> >
>> >> > *Proposed fix*
>> >> >
>> >> > We can clone the JobConf object in HadoopRDD.getJobConf() so each task
>> >> > gets its own JobConf object (and thus Configuration object).  The
>> >> > optimization of broadcasting the Configuration object across the
>> cluster
>> >> > can remain, but on the other side I think it needs to be cloned for
>> each
>> >> > task to allow for concurrent access.  I'm not sure the performance
>> >> > implications, but the comments suggest that the Configuration object
>> is
>> >> > ~10KB so I would expect a clone on the object to be relatively speedy.
>> >> >
>> >> > Has this been observed before?  Does my suggested fix make sense?
>>  I'd be
>> >> > happy to file a Jira ticket and continue discussion there for the
>> right
>> >> way
>> >> > to fix.
>> >> >
>> >> >
>> >> > Thanks!
>> >> > Andrew
>> >> >
>> >> >
>> >> > P.S.  For others seeing this issue, our temporary workaround is to
>> enable
>> >> > spark.speculation, which retries failed (or hung) tasks on other
>> >> machines.
>> >> >
>> >> >
>> >> >
>> >> > "Executor task launch worker-6" daemon prio=10 tid=0x00007f91f01fe000
>> >> > nid=0x54b1 runnable [0x00007f92d74f1000]
>> >> >    java.lang.Thread.State: RUNNABLE
>> >> >     at java.util.HashMap.transfer(HashMap.java:601)
>> >> >     at java.util.HashMap.resize(HashMap.java:581)
>> >> >     at java.util.HashMap.addEntry(HashMap.java:879)
>> >> >     at java.util.HashMap.put(HashMap.java:505)
>> >> >     at
>> org.apache.hadoop.conf.Configuration.set(Configuration.java:803)
>> >> >     at
>> org.apache.hadoop.conf.Configuration.set(Configuration.java:783)
>> >> >     at
>> >> > org.apache.hadoop.conf.Configuration.setClass(Configuration.java:1662)
>> >> >     at org.apache.hadoop.ipc.RPC.setProtocolEngine(RPC.java:193)
>> >> >     at
>> >> >
>> >>
>> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:343)
>> >> >     at
>> >> >
>> >>
>> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:168)
>> >> >     at
>> >> >
>> >>
>> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:129)
>> >> >     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:436)
>> >> >     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:403)
>> >> >     at
>> >> >
>> >>
>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:125)
>> >> >     at
>> >> > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2262)
>> >> >     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:86)
>> >> >     at
>> >> >
>> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2296)
>> >> >     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2278)
>> >> >     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:316)
>> >> >     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:194)
>> >> >     at org.apache.avro.mapred.FsInput.<init>(FsInput.java:37)
>> >> >     at
>> >> >
>> org.apache.avro.mapred.AvroRecordReader.<init>(AvroRecordReader.java:43)
>> >> >     at
>> >> >
>> >>
>> org.apache.avro.mapred.AvroInputFormat.getRecordReader(AvroInputFormat.java:52)
>> >> >     at
>> org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:156)
>> >> >     at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
>> >> >     at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
>> >> >     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>> >> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>> >> >     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> >> >     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>> >> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>> >> >     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> >> >     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>> >> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>> >> >     at
>> >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
>> >> >     at org.apache.spark.scheduler.Task.run(Task.scala:53)
>> >> >     at
>> >> >
>> >>
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
>> >> >     at
>> >> >
>> >>
>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>> >> >     at
>> >> >
>> >>
>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
>> >> >     at java.security.AccessController.doPrivileged(Native Method)
>> >> >     at javax.security.auth.Subject.doAs(Subject.java:415)
>> >> >     at
>> >> >
>> >>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
>> >> >     at
>> >> >
>> >>
>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
>> >> >     at
>> >> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
>> >> >     at
>> >> >
>> >>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> >> >     at
>> >> >
>> >>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> >> >     at java.lang.Thread.run(Thread.java:745)
>> >> >
>> >> >
>> >>
>>

Re: Hadoop's Configuration object isn't threadsafe

Posted by Andrew Ash <an...@andrewash.com>.
Hi Patrick, thanks for taking a look.  I filed as
https://issues.apache.org/jira/browse/SPARK-2546

Would you recommend I pursue the cloned Configuration object approach now
and send in a PR?

Reynold's recent announcement of the broadcast RDD object patch may also
have implications of the right path forward here.  I'm not sure I fully
understand the implications though:
https://github.com/apache/spark/pull/1452

"Once this is committed, we can also remove the JobConf broadcast in
HadoopRDD."

Thanks!
Andrew


On Tue, Jul 15, 2014 at 5:20 PM, Patrick Wendell <pw...@gmail.com> wrote:

> Hey Andrew,
>
> Cloning the conf this might be a good/simple fix for this particular
> problem. It's definitely worth looking into.
>
> There are a few things we can probably do in Spark to deal with
> non-thread-safety inside of the Hadoop FileSystem and Configuration
> classes. One thing we can do in general is to add barriers around the
> locations where we knowingly access Hadoop FileSystem and
> Configuration state from multiple threads (e.g. during our own calls
> to getRecordReader in this case). But this will only deal with "writer
> writer" conflicts where we had multiple calls mutating the same object
> at the same time. It won't deal with "reader writer" conflicts where
> some of our initialization code touches state that is needed during
> normal execution of other tasks.
>
> - Patrick
>
> On Tue, Jul 15, 2014 at 12:56 PM, Andrew Ash <an...@andrewash.com> wrote:
> > Hi Shengzhe,
> >
> > Even if we did make Configuration threadsafe, it'd take quite some time
> for
> > that to trickle down to a Hadoop release that we could actually rely on
> > Spark users having installed.  I agree we should consider whether making
> > Configuration threadsafe is something that Hadoop should do, but for the
> > short term I think Spark needs to be able to handle the common scenario
> of
> > Configuration being single-threaded.
> >
> > Thanks!
> > Andrew
> >
> >
> > On Tue, Jul 15, 2014 at 2:43 PM, yao <ya...@gmail.com> wrote:
> >
> >> Good catch Andrew. In addition to your proposed solution, is that
> possible
> >> to fix Configuration class and make it thread-safe ? I think the fix
> should
> >> be trivial, just use a ConcurrentHashMap, but I am not sure if we can
> push
> >> this change upstream (will hadoop guys accept this change ? for them, it
> >> seems they never expect Configuration object being accessed by multiple
> >> threads).
> >>
> >> -Shengzhe
> >>
> >>
> >> On Mon, Jul 14, 2014 at 10:22 PM, Andrew Ash <an...@andrewash.com>
> wrote:
> >>
> >> > Hi Spark devs,
> >> >
> >> > We discovered a very interesting bug in Spark at work last week in
> Spark
> >> > 0.9.1 -- that the way Spark uses the Hadoop Configuration object is
> prone
> >> to
> >> > thread safety issues.  I believe it still applies in Spark 1.0.1 as
> well.
> >> >  Let me explain:
> >> >
> >> >
> >> > *Observations*
> >> >
> >> >    - Was running a relatively simple job (read from Avro files, do a
> map,
> >> >    do another map, write back to Avro files)
> >> >    - 412 of 413 tasks completed, but the last task was hung in RUNNING
> >> >    state
> >> >    - The 412 successful tasks completed in median time 3.4s
> >> >    - The last hung task didn't finish even in 20 hours
> >> >    - The executor with the hung task was responsible for 100% of one
> core
> >> >    of CPU usage
> >> >    - Jstack of the executor attached (relevant thread pasted below)
> >> >
> >> >
> >> > *Diagnosis*
> >> >
> >> > After doing some code spelunking, we determined the issue was
> concurrent
> >> > use of a Configuration object for each task on an executor.  In Hadoop
> >> each
> >> > task runs in its own JVM, but in Spark multiple tasks can run in the
> same
> >> > JVM, so the single-threaded access assumptions of the Configuration
> >> object
> >> > no longer hold in Spark.
> >> >
> >> > The specific issue is that the AvroRecordReader actually _modifies_
> the
> >> > JobConf it's given when it's instantiated!  It adds a key for the RPC
> >> > protocol engine in the process of connecting to the Hadoop FileSystem.
> >> >  When many tasks start at the same time (like at the start of a job),
> >> many
> >> > tasks are adding this configuration item to the one Configuration
> object
> >> at
> >> > once.  Internally Configuration uses a java.lang.HashMap, which isn't
> >> > threadsafe... The below post is an excellent explanation of what
> happens in
> >> > the situation where multiple threads insert into a HashMap at the same
> >> time.
> >> >
> >> > http://mailinator.blogspot.com/2009/06/beautiful-race-condition.html
> >> >
> >> > The gist is that you have a thread following a cycle of linked list
> nodes
> >> > indefinitely.  This exactly matches our observations of the 100% CPU
> core
> >> > and also the final location in the stack trace.
> >> >
> >> > So it seems the way Spark shares a Configuration object between task
> >> > threads in an executor is incorrect.  We need some way to prevent
> >> > concurrent access to a single Configuration object.
> >> >
> >> >
> >> > *Proposed fix*
> >> >
> >> > We can clone the JobConf object in HadoopRDD.getJobConf() so each task
> >> > gets its own JobConf object (and thus Configuration object).  The
> >> > optimization of broadcasting the Configuration object across the
> cluster
> >> > can remain, but on the other side I think it needs to be cloned for
> each
> >> > task to allow for concurrent access.  I'm not sure the performance
> >> > implications, but the comments suggest that the Configuration object
> is
> >> > ~10KB so I would expect a clone on the object to be relatively speedy.
> >> >
> >> > Has this been observed before?  Does my suggested fix make sense?
>  I'd be
> >> > happy to file a Jira ticket and continue discussion there for the
> right
> >> way
> >> > to fix.
> >> >
> >> >
> >> > Thanks!
> >> > Andrew
> >> >
> >> >
> >> > P.S.  For others seeing this issue, our temporary workaround is to
> enable
> >> > spark.speculation, which retries failed (or hung) tasks on other
> >> machines.
> >> >
> >> >
> >> >
> >> > "Executor task launch worker-6" daemon prio=10 tid=0x00007f91f01fe000
> >> > nid=0x54b1 runnable [0x00007f92d74f1000]
> >> >    java.lang.Thread.State: RUNNABLE
> >> >     at java.util.HashMap.transfer(HashMap.java:601)
> >> >     at java.util.HashMap.resize(HashMap.java:581)
> >> >     at java.util.HashMap.addEntry(HashMap.java:879)
> >> >     at java.util.HashMap.put(HashMap.java:505)
> >> >     at
> org.apache.hadoop.conf.Configuration.set(Configuration.java:803)
> >> >     at
> org.apache.hadoop.conf.Configuration.set(Configuration.java:783)
> >> >     at
> >> > org.apache.hadoop.conf.Configuration.setClass(Configuration.java:1662)
> >> >     at org.apache.hadoop.ipc.RPC.setProtocolEngine(RPC.java:193)
> >> >     at
> >> >
> >>
> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:343)
> >> >     at
> >> >
> >>
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:168)
> >> >     at
> >> >
> >>
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:129)
> >> >     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:436)
> >> >     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:403)
> >> >     at
> >> >
> >>
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:125)
> >> >     at
> >> > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2262)
> >> >     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:86)
> >> >     at
> >> >
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2296)
> >> >     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2278)
> >> >     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:316)
> >> >     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:194)
> >> >     at org.apache.avro.mapred.FsInput.<init>(FsInput.java:37)
> >> >     at
> >> >
> org.apache.avro.mapred.AvroRecordReader.<init>(AvroRecordReader.java:43)
> >> >     at
> >> >
> >>
> org.apache.avro.mapred.AvroInputFormat.getRecordReader(AvroInputFormat.java:52)
> >> >     at
> org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:156)
> >> >     at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
> >> >     at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
> >> >     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> >> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> >> >     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> >> >     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> >> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> >> >     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> >> >     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> >> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> >> >     at
> >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
> >> >     at org.apache.spark.scheduler.Task.run(Task.scala:53)
> >> >     at
> >> >
> >>
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
> >> >     at
> >> >
> >>
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
> >> >     at
> >> >
> >>
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
> >> >     at java.security.AccessController.doPrivileged(Native Method)
> >> >     at javax.security.auth.Subject.doAs(Subject.java:415)
> >> >     at
> >> >
> >>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
> >> >     at
> >> >
> >>
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
> >> >     at
> >> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
> >> >     at
> >> >
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >> >     at
> >> >
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >> >     at java.lang.Thread.run(Thread.java:745)
> >> >
> >> >
> >>
>

Re: Hadoop's Configuration object isn't threadsafe

Posted by Patrick Wendell <pw...@gmail.com>.
Hey Andrew,

Cloning the conf this might be a good/simple fix for this particular
problem. It's definitely worth looking into.

There are a few things we can probably do in Spark to deal with
non-thread-safety inside of the Hadoop FileSystem and Configuration
classes. One thing we can do in general is to add barriers around the
locations where we knowingly access Hadoop FileSystem and
Configuration state from multiple threads (e.g. during our own calls
to getRecordReader in this case). But this will only deal with "writer
writer" conflicts where we had multiple calls mutating the same object
at the same time. It won't deal with "reader writer" conflicts where
some of our initialization code touches state that is needed during
normal execution of other tasks.

- Patrick

On Tue, Jul 15, 2014 at 12:56 PM, Andrew Ash <an...@andrewash.com> wrote:
> Hi Shengzhe,
>
> Even if we did make Configuration threadsafe, it'd take quite some time for
> that to trickle down to a Hadoop release that we could actually rely on
> Spark users having installed.  I agree we should consider whether making
> Configuration threadsafe is something that Hadoop should do, but for the
> short term I think Spark needs to be able to handle the common scenario of
> Configuration being single-threaded.
>
> Thanks!
> Andrew
>
>
> On Tue, Jul 15, 2014 at 2:43 PM, yao <ya...@gmail.com> wrote:
>
>> Good catch Andrew. In addition to your proposed solution, is that possible
>> to fix Configuration class and make it thread-safe ? I think the fix should
>> be trivial, just use a ConcurrentHashMap, but I am not sure if we can push
>> this change upstream (will hadoop guys accept this change ? for them, it
>> seems they never expect Configuration object being accessed by multiple
>> threads).
>>
>> -Shengzhe
>>
>>
>> On Mon, Jul 14, 2014 at 10:22 PM, Andrew Ash <an...@andrewash.com> wrote:
>>
>> > Hi Spark devs,
>> >
>> > We discovered a very interesting bug in Spark at work last week in Spark
>> > 0.9.1 -- that the way Spark uses the Hadoop Configuration object is prone
>> to
>> > thread safety issues.  I believe it still applies in Spark 1.0.1 as well.
>> >  Let me explain:
>> >
>> >
>> > *Observations*
>> >
>> >    - Was running a relatively simple job (read from Avro files, do a map,
>> >    do another map, write back to Avro files)
>> >    - 412 of 413 tasks completed, but the last task was hung in RUNNING
>> >    state
>> >    - The 412 successful tasks completed in median time 3.4s
>> >    - The last hung task didn't finish even in 20 hours
>> >    - The executor with the hung task was responsible for 100% of one core
>> >    of CPU usage
>> >    - Jstack of the executor attached (relevant thread pasted below)
>> >
>> >
>> > *Diagnosis*
>> >
>> > After doing some code spelunking, we determined the issue was concurrent
>> > use of a Configuration object for each task on an executor.  In Hadoop
>> each
>> > task runs in its own JVM, but in Spark multiple tasks can run in the same
>> > JVM, so the single-threaded access assumptions of the Configuration
>> object
>> > no longer hold in Spark.
>> >
>> > The specific issue is that the AvroRecordReader actually _modifies_ the
>> > JobConf it's given when it's instantiated!  It adds a key for the RPC
>> > protocol engine in the process of connecting to the Hadoop FileSystem.
>> >  When many tasks start at the same time (like at the start of a job),
>> many
>> > tasks are adding this configuration item to the one Configuration object
>> at
>> > once.  Internally Configuration uses a java.lang.HashMap, which isn't
>> > threadsafe... The below post is an excellent explanation of what happens in
>> > the situation where multiple threads insert into a HashMap at the same
>> time.
>> >
>> > http://mailinator.blogspot.com/2009/06/beautiful-race-condition.html
>> >
>> > The gist is that you have a thread following a cycle of linked list nodes
>> > indefinitely.  This exactly matches our observations of the 100% CPU core
>> > and also the final location in the stack trace.
>> >
>> > So it seems the way Spark shares a Configuration object between task
>> > threads in an executor is incorrect.  We need some way to prevent
>> > concurrent access to a single Configuration object.
>> >
>> >
>> > *Proposed fix*
>> >
>> > We can clone the JobConf object in HadoopRDD.getJobConf() so each task
>> > gets its own JobConf object (and thus Configuration object).  The
>> > optimization of broadcasting the Configuration object across the cluster
>> > can remain, but on the other side I think it needs to be cloned for each
>> > task to allow for concurrent access.  I'm not sure the performance
>> > implications, but the comments suggest that the Configuration object is
>> > ~10KB so I would expect a clone on the object to be relatively speedy.
>> >
>> > Has this been observed before?  Does my suggested fix make sense?  I'd be
>> > happy to file a Jira ticket and continue discussion there for the right
>> way
>> > to fix.
>> >
>> >
>> > Thanks!
>> > Andrew
>> >
>> >
>> > P.S.  For others seeing this issue, our temporary workaround is to enable
>> > spark.speculation, which retries failed (or hung) tasks on other
>> machines.
>> >
>> >
>> >
>> > "Executor task launch worker-6" daemon prio=10 tid=0x00007f91f01fe000
>> > nid=0x54b1 runnable [0x00007f92d74f1000]
>> >    java.lang.Thread.State: RUNNABLE
>> >     at java.util.HashMap.transfer(HashMap.java:601)
>> >     at java.util.HashMap.resize(HashMap.java:581)
>> >     at java.util.HashMap.addEntry(HashMap.java:879)
>> >     at java.util.HashMap.put(HashMap.java:505)
>> >     at org.apache.hadoop.conf.Configuration.set(Configuration.java:803)
>> >     at org.apache.hadoop.conf.Configuration.set(Configuration.java:783)
>> >     at
>> > org.apache.hadoop.conf.Configuration.setClass(Configuration.java:1662)
>> >     at org.apache.hadoop.ipc.RPC.setProtocolEngine(RPC.java:193)
>> >     at
>> >
>> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:343)
>> >     at
>> >
>> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:168)
>> >     at
>> >
>> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:129)
>> >     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:436)
>> >     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:403)
>> >     at
>> >
>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:125)
>> >     at
>> > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2262)
>> >     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:86)
>> >     at
>> > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2296)
>> >     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2278)
>> >     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:316)
>> >     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:194)
>> >     at org.apache.avro.mapred.FsInput.<init>(FsInput.java:37)
>> >     at
>> > org.apache.avro.mapred.AvroRecordReader.<init>(AvroRecordReader.java:43)
>> >     at
>> >
>> org.apache.avro.mapred.AvroInputFormat.getRecordReader(AvroInputFormat.java:52)
>> >     at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:156)
>> >     at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
>> >     at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
>> >     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>> >     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> >     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>> >     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> >     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>> >     at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
>> >     at org.apache.spark.scheduler.Task.run(Task.scala:53)
>> >     at
>> >
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
>> >     at
>> >
>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>> >     at
>> >
>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
>> >     at java.security.AccessController.doPrivileged(Native Method)
>> >     at javax.security.auth.Subject.doAs(Subject.java:415)
>> >     at
>> >
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
>> >     at
>> >
>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
>> >     at
>> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
>> >     at
>> >
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> >     at
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> >     at java.lang.Thread.run(Thread.java:745)
>> >
>> >
>>

Re: Hadoop's Configuration object isn't threadsafe

Posted by Andrew Ash <an...@andrewash.com>.
Hi Shengzhe,

Even if we did make Configuration threadsafe, it'd take quite some time for
that to trickle down to a Hadoop release that we could actually rely on
Spark users having installed.  I agree we should consider whether making
Configuration threadsafe is something that Hadoop should do, but for the
short term I think Spark needs to be able to handle the common scenario of
Configuration being single-threaded.

Thanks!
Andrew


On Tue, Jul 15, 2014 at 2:43 PM, yao <ya...@gmail.com> wrote:

> Good catch Andrew. In addition to your proposed solution, is that possible
> to fix Configuration class and make it thread-safe ? I think the fix should
> be trivial, just use a ConcurrentHashMap, but I am not sure if we can push
> this change upstream (will hadoop guys accept this change ? for them, it
> seems they never expect Configuration object being accessed by multiple
> threads).
>
> -Shengzhe
>
>
> On Mon, Jul 14, 2014 at 10:22 PM, Andrew Ash <an...@andrewash.com> wrote:
>
> > Hi Spark devs,
> >
> > We discovered a very interesting bug in Spark at work last week in Spark
> > 0.9.1 — that the way Spark uses the Hadoop Configuration object is prone
> to
> > thread safety issues.  I believe it still applies in Spark 1.0.1 as well.
> >  Let me explain:
> >
> >
> > *Observations*
> >
> >    - Was running a relatively simple job (read from Avro files, do a map,
> >    do another map, write back to Avro files)
> >    - 412 of 413 tasks completed, but the last task was hung in RUNNING
> >    state
> >    - The 412 successful tasks completed in median time 3.4s
> >    - The last hung task didn't finish even in 20 hours
> >    - The executor with the hung task was responsible for 100% of one core
> >    of CPU usage
> >    - Jstack of the executor attached (relevant thread pasted below)
> >
> >
> > *Diagnosis*
> >
> > After doing some code spelunking, we determined the issue was concurrent
> > use of a Configuration object for each task on an executor.  In Hadoop
> each
> > task runs in its own JVM, but in Spark multiple tasks can run in the same
> > JVM, so the single-threaded access assumptions of the Configuration
> object
> > no longer hold in Spark.
> >
> > The specific issue is that the AvroRecordReader actually _modifies_ the
> > JobConf it's given when it's instantiated!  It adds a key for the RPC
> > protocol engine in the process of connecting to the Hadoop FileSystem.
> >  When many tasks start at the same time (like at the start of a job),
> many
> > tasks are adding this configuration item to the one Configuration object
> at
> > once.  Internally Configuration uses a java.lang.HashMap, which isn't
> > threadsafe… The below post is an excellent explanation of what happens in
> > the situation where multiple threads insert into a HashMap at the same
> time.
> >
> > http://mailinator.blogspot.com/2009/06/beautiful-race-condition.html
> >
> > The gist is that you have a thread following a cycle of linked list nodes
> > indefinitely.  This exactly matches our observations of the 100% CPU core
> > and also the final location in the stack trace.
> >
> > So it seems the way Spark shares a Configuration object between task
> > threads in an executor is incorrect.  We need some way to prevent
> > concurrent access to a single Configuration object.
> >
> >
> > *Proposed fix*
> >
> > We can clone the JobConf object in HadoopRDD.getJobConf() so each task
> > gets its own JobConf object (and thus Configuration object).  The
> > optimization of broadcasting the Configuration object across the cluster
> > can remain, but on the other side I think it needs to be cloned for each
> > task to allow for concurrent access.  I'm not sure the performance
> > implications, but the comments suggest that the Configuration object is
> > ~10KB so I would expect a clone on the object to be relatively speedy.
> >
> > Has this been observed before?  Does my suggested fix make sense?  I'd be
> > happy to file a Jira ticket and continue discussion there for the right
> way
> > to fix.
> >
> >
> > Thanks!
> > Andrew
> >
> >
> > P.S.  For others seeing this issue, our temporary workaround is to enable
> > spark.speculation, which retries failed (or hung) tasks on other
> machines.
> >
> >
> >
> > "Executor task launch worker-6" daemon prio=10 tid=0x00007f91f01fe000
> > nid=0x54b1 runnable [0x00007f92d74f1000]
> >    java.lang.Thread.State: RUNNABLE
> >     at java.util.HashMap.transfer(HashMap.java:601)
> >     at java.util.HashMap.resize(HashMap.java:581)
> >     at java.util.HashMap.addEntry(HashMap.java:879)
> >     at java.util.HashMap.put(HashMap.java:505)
> >     at org.apache.hadoop.conf.Configuration.set(Configuration.java:803)
> >     at org.apache.hadoop.conf.Configuration.set(Configuration.java:783)
> >     at
> > org.apache.hadoop.conf.Configuration.setClass(Configuration.java:1662)
> >     at org.apache.hadoop.ipc.RPC.setProtocolEngine(RPC.java:193)
> >     at
> >
> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:343)
> >     at
> >
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:168)
> >     at
> >
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:129)
> >     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:436)
> >     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:403)
> >     at
> >
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:125)
> >     at
> > org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2262)
> >     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:86)
> >     at
> > org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2296)
> >     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2278)
> >     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:316)
> >     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:194)
> >     at org.apache.avro.mapred.FsInput.<init>(FsInput.java:37)
> >     at
> > org.apache.avro.mapred.AvroRecordReader.<init>(AvroRecordReader.java:43)
> >     at
> >
> org.apache.avro.mapred.AvroInputFormat.getRecordReader(AvroInputFormat.java:52)
> >     at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:156)
> >     at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
> >     at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
> >     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> >     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> >     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> >     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
> >     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
> >     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
> >     at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
> >     at org.apache.spark.scheduler.Task.run(Task.scala:53)
> >     at
> >
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
> >     at
> >
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
> >     at
> >
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
> >     at java.security.AccessController.doPrivileged(Native Method)
> >     at javax.security.auth.Subject.doAs(Subject.java:415)
> >     at
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
> >     at
> >
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
> >     at
> > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
> >     at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> >     at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> >     at java.lang.Thread.run(Thread.java:745)
> >
> >
>

Re: Hadoop's Configuration object isn't threadsafe

Posted by yao <ya...@gmail.com>.
Good catch Andrew. In addition to your proposed solution, is that possible
to fix Configuration class and make it thread-safe ? I think the fix should
be trivial, just use a ConcurrentHashMap, but I am not sure if we can push
this change upstream (will hadoop guys accept this change ? for them, it
seems they never expect Configuration object being accessed by multiple
threads).

-Shengzhe


On Mon, Jul 14, 2014 at 10:22 PM, Andrew Ash <an...@andrewash.com> wrote:

> Hi Spark devs,
>
> We discovered a very interesting bug in Spark at work last week in Spark
> 0.9.1 — that the way Spark uses the Hadoop Configuration object is prone to
> thread safety issues.  I believe it still applies in Spark 1.0.1 as well.
>  Let me explain:
>
>
> *Observations*
>
>    - Was running a relatively simple job (read from Avro files, do a map,
>    do another map, write back to Avro files)
>    - 412 of 413 tasks completed, but the last task was hung in RUNNING
>    state
>    - The 412 successful tasks completed in median time 3.4s
>    - The last hung task didn't finish even in 20 hours
>    - The executor with the hung task was responsible for 100% of one core
>    of CPU usage
>    - Jstack of the executor attached (relevant thread pasted below)
>
>
> *Diagnosis*
>
> After doing some code spelunking, we determined the issue was concurrent
> use of a Configuration object for each task on an executor.  In Hadoop each
> task runs in its own JVM, but in Spark multiple tasks can run in the same
> JVM, so the single-threaded access assumptions of the Configuration object
> no longer hold in Spark.
>
> The specific issue is that the AvroRecordReader actually _modifies_ the
> JobConf it's given when it's instantiated!  It adds a key for the RPC
> protocol engine in the process of connecting to the Hadoop FileSystem.
>  When many tasks start at the same time (like at the start of a job), many
> tasks are adding this configuration item to the one Configuration object at
> once.  Internally Configuration uses a java.lang.HashMap, which isn't
> threadsafe… The below post is an excellent explanation of what happens in
> the situation where multiple threads insert into a HashMap at the same time.
>
> http://mailinator.blogspot.com/2009/06/beautiful-race-condition.html
>
> The gist is that you have a thread following a cycle of linked list nodes
> indefinitely.  This exactly matches our observations of the 100% CPU core
> and also the final location in the stack trace.
>
> So it seems the way Spark shares a Configuration object between task
> threads in an executor is incorrect.  We need some way to prevent
> concurrent access to a single Configuration object.
>
>
> *Proposed fix*
>
> We can clone the JobConf object in HadoopRDD.getJobConf() so each task
> gets its own JobConf object (and thus Configuration object).  The
> optimization of broadcasting the Configuration object across the cluster
> can remain, but on the other side I think it needs to be cloned for each
> task to allow for concurrent access.  I'm not sure the performance
> implications, but the comments suggest that the Configuration object is
> ~10KB so I would expect a clone on the object to be relatively speedy.
>
> Has this been observed before?  Does my suggested fix make sense?  I'd be
> happy to file a Jira ticket and continue discussion there for the right way
> to fix.
>
>
> Thanks!
> Andrew
>
>
> P.S.  For others seeing this issue, our temporary workaround is to enable
> spark.speculation, which retries failed (or hung) tasks on other machines.
>
>
>
> "Executor task launch worker-6" daemon prio=10 tid=0x00007f91f01fe000
> nid=0x54b1 runnable [0x00007f92d74f1000]
>    java.lang.Thread.State: RUNNABLE
>     at java.util.HashMap.transfer(HashMap.java:601)
>     at java.util.HashMap.resize(HashMap.java:581)
>     at java.util.HashMap.addEntry(HashMap.java:879)
>     at java.util.HashMap.put(HashMap.java:505)
>     at org.apache.hadoop.conf.Configuration.set(Configuration.java:803)
>     at org.apache.hadoop.conf.Configuration.set(Configuration.java:783)
>     at
> org.apache.hadoop.conf.Configuration.setClass(Configuration.java:1662)
>     at org.apache.hadoop.ipc.RPC.setProtocolEngine(RPC.java:193)
>     at
> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:343)
>     at
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:168)
>     at
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:129)
>     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:436)
>     at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:403)
>     at
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:125)
>     at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2262)
>     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:86)
>     at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2296)
>     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2278)
>     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:316)
>     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:194)
>     at org.apache.avro.mapred.FsInput.<init>(FsInput.java:37)
>     at
> org.apache.avro.mapred.AvroRecordReader.<init>(AvroRecordReader.java:43)
>     at
> org.apache.avro.mapred.AvroInputFormat.getRecordReader(AvroInputFormat.java:52)
>     at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:156)
>     at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
>     at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>     at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
>     at org.apache.spark.scheduler.Task.run(Task.scala:53)
>     at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
>     at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>     at
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:415)
>     at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
>     at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
>
>

Fwd: Hadoop's Configuration object isn't threadsafe

Posted by Fabian Hueske <fh...@apache.org>.
Hi folks,

I forward this mail from the Spark dev list as it might be an issue for
Flink as well.
Flink also runs multiple parallel tasks in one JVM and allows to use Hadoop
InputFormats, OutputFormats, and soon also Map- and ReduceTasks.


---------- Forwarded message ----------
From: Andrew Ash <an...@andrewash.com>
Date: 2014-07-15 7:22 GMT+02:00
Subject: Hadoop's Configuration object isn't threadsafe
To: dev@spark.apache.org


Hi Spark devs,

We discovered a very interesting bug in Spark at work last week in Spark
0.9.1 — that the way Spark uses the Hadoop Configuration object is prone to
thread safety issues.  I believe it still applies in Spark 1.0.1 as well.
 Let me explain:


*Observations*

   - Was running a relatively simple job (read from Avro files, do a map,
   do another map, write back to Avro files)
   - 412 of 413 tasks completed, but the last task was hung in RUNNING state
   - The 412 successful tasks completed in median time 3.4s
   - The last hung task didn't finish even in 20 hours
   - The executor with the hung task was responsible for 100% of one core
   of CPU usage
   - Jstack of the executor attached (relevant thread pasted below)


*Diagnosis*

After doing some code spelunking, we determined the issue was concurrent
use of a Configuration object for each task on an executor.  In Hadoop each
task runs in its own JVM, but in Spark multiple tasks can run in the same
JVM, so the single-threaded access assumptions of the Configuration object
no longer hold in Spark.

The specific issue is that the AvroRecordReader actually _modifies_ the
JobConf it's given when it's instantiated!  It adds a key for the RPC
protocol engine in the process of connecting to the Hadoop FileSystem.
 When many tasks start at the same time (like at the start of a job), many
tasks are adding this configuration item to the one Configuration object at
once.  Internally Configuration uses a java.lang.HashMap, which isn't
threadsafe… The below post is an excellent explanation of what happens in
the situation where multiple threads insert into a HashMap at the same time.

http://mailinator.blogspot.com/2009/06/beautiful-race-condition.html

The gist is that you have a thread following a cycle of linked list nodes
indefinitely.  This exactly matches our observations of the 100% CPU core
and also the final location in the stack trace.

So it seems the way Spark shares a Configuration object between task
threads in an executor is incorrect.  We need some way to prevent
concurrent access to a single Configuration object.


*Proposed fix*

We can clone the JobConf object in HadoopRDD.getJobConf() so each task gets
its own JobConf object (and thus Configuration object).  The optimization
of broadcasting the Configuration object across the cluster can remain, but
on the other side I think it needs to be cloned for each task to allow for
concurrent access.  I'm not sure the performance implications, but the
comments suggest that the Configuration object is ~10KB so I would expect a
clone on the object to be relatively speedy.

Has this been observed before?  Does my suggested fix make sense?  I'd be
happy to file a Jira ticket and continue discussion there for the right way
to fix.


Thanks!
Andrew


P.S.  For others seeing this issue, our temporary workaround is to enable
spark.speculation, which retries failed (or hung) tasks on other machines.



"Executor task launch worker-6" daemon prio=10 tid=0x00007f91f01fe000
nid=0x54b1 runnable [0x00007f92d74f1000]
   java.lang.Thread.State: RUNNABLE
    at java.util.HashMap.transfer(HashMap.java:601)
    at java.util.HashMap.resize(HashMap.java:581)
    at java.util.HashMap.addEntry(HashMap.java:879)
    at java.util.HashMap.put(HashMap.java:505)
    at org.apache.hadoop.conf.Configuration.set(Configuration.java:803)
    at org.apache.hadoop.conf.Configuration.set(Configuration.java:783)
    at
org.apache.hadoop.conf.Configuration.setClass(Configuration.java:1662)
    at org.apache.hadoop.ipc.RPC.setProtocolEngine(RPC.java:193)
    at
org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:343)
    at
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:168)
    at
org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:129)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:436)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:403)
    at
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:125)
    at
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2262)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:86)
    at
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2296)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2278)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:316)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:194)
    at org.apache.avro.mapred.FsInput.<init>(FsInput.java:37)
    at
org.apache.avro.mapred.AvroRecordReader.<init>(AvroRecordReader.java:43)
    at
org.apache.avro.mapred.AvroInputFormat.getRecordReader(AvroInputFormat.java:52)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:156)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:149)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:64)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:241)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:232)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:211)
    at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
    at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
    at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)