You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Charles Menguy <me...@gmail.com> on 2015/07/18 03:16:22 UTC

Reading SequenceFiles from S3 with PySpark on EMR causes RACK_LOCAL locality

I am trying to use PySpark on EMR to analyze some data stored as
SequenceFiles on S3, but running into performance issues due to data
locality. Here is a very simple sample that doesn't work well:

seqRDD =
sc.sequenceFile("s3n://<access>:<secret>@<bucket>/<table>/day=2015-07-04/hour=*/*")
seqRDD.count()

The issue is with the count action, it works fine but distribution of the
tasks is very poor. For some reason in the Spark logs I only see 2 IPs of
the cluster doing any actual work while the rest sits idle. I tried with a
5 node cluster and 50 nodes cluster and it's always only 2 IPs appearing in
the logs.

Also very strange is that these 2 IPs have a locality of RACK_LOCAL. I'm
presuming it's because data is in S3 so it's not local, but how can I make
Spark use the whole cluster instead of just 2 instances?

I didn't do anything specific for Spark configuration on EMR, simply
installing it on EMR via native app and I believe it takes care
automatically of optimizing the configs. I ran PySpark with --master
yarn-client

I saw this in the logs, the allowLocal=false could be an issue but I
couldn't find anything on that:

15/07/17 23:55:27 INFO spark.SparkContext: Starting job: count at :1
15/07/17 23:55:27 INFO scheduler.DAGScheduler: Got job 1 (count at :1) with
1354 output partitions (allowLocal=false)
15/07/17 23:55:27 INFO scheduler.DAGScheduler: Final stage: Stage 1(count
at :1)

Some logs that follow when running the count, showing only 2 IPs:

15/07/17 23:55:28 INFO scheduler.DAGScheduler: Submitting 1354 missing
tasks from Stage 1 (PythonRDD[3] at count at :1)
15/07/17 23:55:28 INFO cluster.YarnScheduler: Adding task set 1.0 with 1354
tasks
15/07/17 23:55:28 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
1.0 (TID 1, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1418 bytes)
15/07/17 23:55:28 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
1.0 (TID 2, ip-172-31-36-179.ec2.internal, RACK_LOCAL, 1420 bytes)
15/07/17 23:55:28 INFO storage.BlockManagerInfo: Added broadcast_3_piece0
in memory on ip-172-31-36-179.ec2.internal:39998 (size: 3.7 KB, free: 535.0
MB)
15/07/17 23:55:28 INFO storage.BlockManagerInfo: Added broadcast_3_piece0
in memory on ip-172-31-41-210.ec2.internal:36847 (size: 3.7 KB, free: 535.0
MB)
15/07/17 23:55:29 INFO storage.BlockManagerInfo: Added broadcast_0_piece0
in memory on ip-172-31-41-210.ec2.internal:36847 (size: 18.8 KB, free:
535.0 MB)
15/07/17 23:55:31 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
1.0 (TID 3, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1421 bytes)
15/07/17 23:55:31 INFO scheduler.TaskSetManager: Finished task 0.0 in stage
1.0 (TID 1) in 3501 ms on ip-172-31-41-210.ec2.internal (1/1354)
15/07/17 23:55:31 INFO scheduler.TaskSetManager: Starting task 3.0 in stage
1.0 (TID 4, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1420 bytes)
15/07/17 23:55:31 INFO scheduler.TaskSetManager: Finished task 2.0 in stage
1.0 (TID 3) in 99 ms on ip-172-31-41-210.ec2.internal (2/1354)
15/07/17 23:55:33 INFO scheduler.TaskSetManager: Starting task 4.0 in stage
1.0 (TID 5, ip-172-31-36-179.ec2.internal, RACK_LOCAL, 1420 bytes)
15/07/17 23:55:33 INFO scheduler.TaskSetManager: Finished task 1.0 in stage
1.0 (TID 2) in 5190 ms on ip-172-31-36-179.ec2.internal (3/1354)
15/07/17 23:55:36 INFO scheduler.TaskSetManager: Starting task 5.0 in stage
1.0 (TID 6, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1420 bytes)
15/07/17 23:55:36 INFO scheduler.TaskSetManager: Finished task 3.0 in stage
1.0 (TID 4) in 4471 ms on ip-172-31-41-210.ec2.internal (4/1354)
15/07/17 23:55:37 INFO scheduler.TaskSetManager: Starting task 6.0 in stage
1.0 (TID 7, ip-172-31-36-179.ec2.internal, RACK_LOCAL, 1420 bytes)
15/07/17 23:55:37 INFO scheduler.TaskSetManager: Finished task 4.0 in stage
1.0 (TID 5) in 3676 ms on ip-172-31-36-179.ec2.internal (5/1354)
15/07/17 23:55:40 INFO scheduler.TaskSetManager: Starting task 7.0 in stage
1.0 (TID 8, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1420 bytes)
15/07/17 23:55:40 INFO scheduler.TaskSetManager: Finished task 5.0 in stage
1.0 (TID 6) in 3895 ms on ip-172-31-41-210.ec2.internal (6/1354)
15/07/17 23:55:40 INFO scheduler.TaskSetManager: Starting task 8.0 in stage
1.0 (TID 9, ip-1

I also tried eliminating S3 by distcp'ing the S3 data first into HDFS in
the EMR cluster and then running a count() on that, but it doesn't make
much difference, there are still only 2 IPs processing, they initially
start as NODE_LOCAL but eventually switch to RACK_LOCAL.

I'm at a loss at what I have misconfigured, any help would be appreciated.

Thanks !

Charles

Re: Reading SequenceFiles from S3 with PySpark on EMR causes RACK_LOCAL locality

Posted by "michal.klos81@gmail.com" <mi...@gmail.com>.
Make sure you are setting num executors correctly

M



> On Jul 17, 2015, at 9:16 PM, Charles Menguy <me...@gmail.com> wrote:
> 
> I am trying to use PySpark on EMR to analyze some data stored as SequenceFiles on S3, but running into performance issues due to data locality. Here is a very simple sample that doesn't work well:
> 
> seqRDD = sc.sequenceFile("s3n://<access>:<secret>@<bucket>/<table>/day=2015-07-04/hour=*/*")
> seqRDD.count()
> 
> The issue is with the count action, it works fine but distribution of the tasks is very poor. For some reason in the Spark logs I only see 2 IPs of the cluster doing any actual work while the rest sits idle. I tried with a 5 node cluster and 50 nodes cluster and it's always only 2 IPs appearing in the logs.
> 
> Also very strange is that these 2 IPs have a locality of RACK_LOCAL. I'm presuming it's because data is in S3 so it's not local, but how can I make Spark use the whole cluster instead of just 2 instances?
> 
> I didn't do anything specific for Spark configuration on EMR, simply installing it on EMR via native app and I believe it takes care automatically of optimizing the configs. I ran PySpark with --master yarn-client
> 
> I saw this in the logs, the allowLocal=false could be an issue but I couldn't find anything on that:
> 
> 15/07/17 23:55:27 INFO spark.SparkContext: Starting job: count at :1
> 15/07/17 23:55:27 INFO scheduler.DAGScheduler: Got job 1 (count at :1) with 1354 output partitions (allowLocal=false)
> 15/07/17 23:55:27 INFO scheduler.DAGScheduler: Final stage: Stage 1(count at :1)
> 
> Some logs that follow when running the count, showing only 2 IPs:
> 
> 15/07/17 23:55:28 INFO scheduler.DAGScheduler: Submitting 1354 missing tasks from Stage 1 (PythonRDD[3] at count at :1)
> 15/07/17 23:55:28 INFO cluster.YarnScheduler: Adding task set 1.0 with 1354 tasks
> 15/07/17 23:55:28 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1418 bytes)
> 15/07/17 23:55:28 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 1.0 (TID 2, ip-172-31-36-179.ec2.internal, RACK_LOCAL, 1420 bytes)
> 15/07/17 23:55:28 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on ip-172-31-36-179.ec2.internal:39998 (size: 3.7 KB, free: 535.0 MB)
> 15/07/17 23:55:28 INFO storage.BlockManagerInfo: Added broadcast_3_piece0 in memory on ip-172-31-41-210.ec2.internal:36847 (size: 3.7 KB, free: 535.0 MB)
> 15/07/17 23:55:29 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on ip-172-31-41-210.ec2.internal:36847 (size: 18.8 KB, free: 535.0 MB)
> 15/07/17 23:55:31 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 1.0 (TID 3, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1421 bytes)
> 15/07/17 23:55:31 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 3501 ms on ip-172-31-41-210.ec2.internal (1/1354)
> 15/07/17 23:55:31 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 1.0 (TID 4, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1420 bytes)
> 15/07/17 23:55:31 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 1.0 (TID 3) in 99 ms on ip-172-31-41-210.ec2.internal (2/1354)
> 15/07/17 23:55:33 INFO scheduler.TaskSetManager: Starting task 4.0 in stage 1.0 (TID 5, ip-172-31-36-179.ec2.internal, RACK_LOCAL, 1420 bytes)
> 15/07/17 23:55:33 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 1.0 (TID 2) in 5190 ms on ip-172-31-36-179.ec2.internal (3/1354)
> 15/07/17 23:55:36 INFO scheduler.TaskSetManager: Starting task 5.0 in stage 1.0 (TID 6, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1420 bytes)
> 15/07/17 23:55:36 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 1.0 (TID 4) in 4471 ms on ip-172-31-41-210.ec2.internal (4/1354)
> 15/07/17 23:55:37 INFO scheduler.TaskSetManager: Starting task 6.0 in stage 1.0 (TID 7, ip-172-31-36-179.ec2.internal, RACK_LOCAL, 1420 bytes)
> 15/07/17 23:55:37 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 1.0 (TID 5) in 3676 ms on ip-172-31-36-179.ec2.internal (5/1354)
> 15/07/17 23:55:40 INFO scheduler.TaskSetManager: Starting task 7.0 in stage 1.0 (TID 8, ip-172-31-41-210.ec2.internal, RACK_LOCAL, 1420 bytes)
> 15/07/17 23:55:40 INFO scheduler.TaskSetManager: Finished task 5.0 in stage 1.0 (TID 6) in 3895 ms on ip-172-31-41-210.ec2.internal (6/1354)
> 15/07/17 23:55:40 INFO scheduler.TaskSetManager: Starting task 8.0 in stage 1.0 (TID 9, ip-1
> 
> I also tried eliminating S3 by distcp'ing the S3 data first into HDFS in the EMR cluster and then running a count() on that, but it doesn't make much difference, there are still only 2 IPs processing, they initially start as NODE_LOCAL but eventually switch to RACK_LOCAL.
> 
> I'm at a loss at what I have misconfigured, any help would be appreciated.
> 
> Thanks !
> 
> Charles