You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shashank Rao <sh...@gmail.com> on 2023/05/16 17:47:24 UTC

Understanding Spark S3 Read Performance

Hi,
I'm trying to set up a Spark pipeline which reads data from S3 and writes
it into Google Big Query.

Environment Details:
-----------------------
Java 8
AWS EMR-6.10.0
Spark v3.3.1
2 m5.xlarge executor nodes


S3 Directory structure:
-----------
bucket-name:
|---folder1:
  |---folder2:
     |---file1.jsonl
     |---file2.jsonl
       ...
     |---file12000.jsonl


Each file is of size 1.6 MB and there are a total of 12,000 files.

The code to read the source data looks like this:
----------------------------
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key",<ACCESS_KEY>)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key",<SECRET_KEY>)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint","
s3.amazonaws.com")
Dataset<Row> data = spark.read().option("recursiveFileLookup",
"true").json("s3a://bucket-name/folder1/folder2/")
----------------------------

Now here's my problem:
This triggers two jobs: a) Listing Leaf Nodes, b) Json Read

[image: Screenshot 2023-05-16 at 23.00.23.png]

The list job takes around 12 mins and has 10,000 partitions/tasks. The read
job takes 14 mins and has 375 partitions/tasks.

1. What's going on here? Why does the list job take so much time? I could
write a simple JAVA code using AWS SDK which listed all the files in the
exact same S3 directory in 5 seconds.

2. Why are there 10,000 partitions/tasks for listing? Can this be
configured / changed? Is there any optimisation that can be done to reduce
the time taken over here?

3. What is the second job doing? Why are there 375 partitions in that job?

If this works out, I actually need to run the pipeline on a much larger
data set of around 200,000 files and it doesn't look like it will scale
very well.

Please note, modifying the source data is not an option that I have. Hence,
I cannot merge multiple small files into a single large file.

Any help is appreciated.


-- 
Thanks,
Shashank Rao

Re: Understanding Spark S3 Read Performance

Posted by Shashank Rao <sh...@gmail.com>.
Hi Enrico,
Thanks for the suggestion.
You're right, the second job is due to the schema evaluation. If I use
spark.read.text
instead of json, it does not trigger the second job.
As for the reason why the first job takes so much time, I'll enable the
debug log mode and try to figure out what's happening and will update here.
Thank you.

On Wed, 17 May 2023 at 21:02, Enrico Minack <in...@enrico.minack.dev> wrote:

> I think the reason why you are seeing two Spark jobs is that
> spark.read.json kicks off a Spark job to fetch the schema of *all files*
> to derive the general schema for the DataFrame / Dataset.
>
> If you define the schema via .schema(), this will not kick off a job:
>
> val ds = spark.read.option("recursiveFileLookup", "true").schema("id long, value double").json("s3a://bucket")
>
> If you set sc.setLogLevel("INFO"), you will see a log line like this,
> which shows you how long the file listing takes (this is done on the driver
> and not in a Spark job):
>
> 23/05/17 16:59:54 INFO InMemoryFileIndex: It took 1751 ms to list leaf
> files for 16 paths.
> Only calling an action on ds will kick off a job now:
> ds.count()
>
> Alternatively, you can set samplingRatio to a small fraction like 0.01,
> which will make Spark sample the schema from a small fraction of files /
> rows.
>
> https://spark.apache.org/docs/latest/sql-data-sources-json.html
>
>
> The tasks in the second job each read 12 million rows, so your 12k files
> seem to have 1k rows each. Is that correct?
>
> All files seem to be 1GB in size only, whereas you said there are 12k
> files with 1.6MB each, which should be 19,2GB. It looks like your files
> have 1k rows and 87,5kB size rather than 1,6MB.
>
> Reading 1GB in 14 minutes is 620kBps per executor and 154kBps per core,
> which is really slow. This might be because the files are really really
> small.
>
> Are you sure the 14 minutes is only reading from S3, or does this include
> writing to Google Big Query? What is the action (e.g. ds.write.json or
> ds.count?) that you are calling on the read JSON dataset that is creating
> the second job?
> I recommend setting the log level to DEBUG and watching the drivers stdout
> / stderr / log for insights on why each task takes 17s for 2.8MB.
>
> You could easily .coalesce(32) the read JSON dataset to get bigger tasks
> / partitions and see if this improves things.
>
> Cheers,
> Enrico
>
>
> Am 17.05.23 um 07:41 schrieb Shashank Rao:
>
> Hi,
> 12 and 14 minutes are the wall clock times. There are totally 2 executors
> that are executing the tasks.
> Have attached the executor metrics screenshots for both the jobs.
>
> [image: List Job.png]
> [image: Read Job.png]
>
>
>
> On Tue, 16 May 2023 at 23:47, info <in...@enrico.minack.dev> wrote:
>
>> Hi,
>>
>> For clarification, are those 12 / 14 minutes cumulative cpu time or wall
>> clock time? How many executors executed those 10000 / 375 tasks?
>>
>> Cheers,
>> Enrico
>>
>>
>> -------- Ursprüngliche Nachricht --------
>> Von: Shashank Rao <sh...@gmail.com>
>> Datum: 16.05.23 19:48 (GMT+01:00)
>> An: user@spark.apache.org
>> Betreff: Understanding Spark S3 Read Performance
>>
>> Hi,
>> I'm trying to set up a Spark pipeline which reads data from S3 and writes
>> it into Google Big Query.
>>
>> Environment Details:
>> -----------------------
>> Java 8
>> AWS EMR-6.10.0
>> Spark v3.3.1
>> 2 m5.xlarge executor nodes
>>
>>
>> S3 Directory structure:
>> -----------
>> bucket-name:
>> |---folder1:
>>   |---folder2:
>>      |---file1.jsonl
>>      |---file2.jsonl
>>        ...
>>      |---file12000.jsonl
>>
>>
>> Each file is of size 1.6 MB and there are a total of 12,000 files.
>>
>> The code to read the source data looks like this:
>> ----------------------------
>>
>> spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key",<ACCESS_KEY>)
>>
>> spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key",<SECRET_KEY>)
>> spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint","
>> s3.amazonaws.com")
>> Dataset<Row> data = spark.read().option("recursiveFileLookup",
>> "true").json("s3a://bucket-name/folder1/folder2/")
>> ----------------------------
>>
>> Now here's my problem:
>> This triggers two jobs: a) Listing Leaf Nodes, b) Json Read
>>
>> [image: Screenshot 2023-05-16 at 23.00.23.png]
>>
>> The list job takes around 12 mins and has 10,000 partitions/tasks. The
>> read job takes 14 mins and has 375 partitions/tasks.
>>
>> 1. What's going on here? Why does the list job take so much time? I could
>> write a simple JAVA code using AWS SDK which listed all the files in the
>> exact same S3 directory in 5 seconds.
>>
>> 2. Why are there 10,000 partitions/tasks for listing? Can this be
>> configured / changed? Is there any optimisation that can be done to reduce
>> the time taken over here?
>>
>> 3. What is the second job doing? Why are there 375 partitions in that
>> job?
>>
>> If this works out, I actually need to run the pipeline on a much larger
>> data set of around 200,000 files and it doesn't look like it will scale
>> very well.
>>
>> Please note, modifying the source data is not an option that I have.
>> Hence, I cannot merge multiple small files into a single large file.
>>
>> Any help is appreciated.
>>
>>
>> --
>> Thanks,
>> Shashank Rao
>>
>
>
> --
> Regards,
> Shashank Rao
>
>
>

-- 
Regards,
Shashank Rao

Re: Understanding Spark S3 Read Performance

Posted by Enrico Minack <in...@enrico.minack.dev>.
I think the reason why you are seeing two Spark jobs is that 
spark.read.json kicks off a Spark job to fetch the schema of *all files* 
to derive the general schema for the DataFrame / Dataset.

If you define the schema via .schema(), this will not kick off a job:

val ds = spark.read.option("recursiveFileLookup", "true").schema("id long, value double").json("s3a://bucket")

If you set sc.setLogLevel("INFO"), you will see a log line like this, 
which shows you how long the file listing takes (this is done on the 
driver and not in a Spark job):

23/05/17 16:59:54 INFO InMemoryFileIndex: It took 1751 ms to list leaf 
files for 16 paths.

Only calling an action on ds will kick off a job now:
ds.count()

Alternatively, you can set |samplingRatio| to a small fraction like 
0.01, which will make Spark sample the schema from a small fraction of 
files / rows.

https://spark.apache.org/docs/latest/sql-data-sources-json.html


The tasks in the second job each read 12 million rows, so your 12k files 
seem to have 1k rows each. Is that correct?

All files seem to be 1GB in size only, whereas you said there are 12k 
files with 1.6MB each, which should be 19,2GB. It looks like your files 
have 1k rows and 87,5kB size rather than 1,6MB.

Reading 1GB in 14 minutes is 620kBps per executor and 154kBps per core, 
which is really slow. This might be because the files are really really 
small.

Are you sure the 14 minutes is only reading from S3, or does this 
include writing to Google Big Query? What is the action (e.g. 
ds.write.json or ds.count?) that you are calling on the read JSON 
dataset that is creating the second job?

I recommend setting the log level to DEBUG and watching the drivers 
stdout / stderr / log for insights on why each task takes 17s for 2.8MB.

You could easily .coalesce(32) the read JSON dataset to get bigger tasks 
/ partitions and see if this improves things.

Cheers,
Enrico


Am 17.05.23 um 07:41 schrieb Shashank Rao:
> Hi,
> 12 and 14 minutes are the wall clock times. There are totally 2 
> executors that are executing the tasks.
> Have attached the executor metrics screenshots for both the jobs.
>
> List Job.png
> Read Job.png
>
>
>
> On Tue, 16 May 2023 at 23:47, info <in...@enrico.minack.dev> wrote:
>
>     Hi,
>
>     For clarification, are those 12 / 14 minutes cumulative cpu time
>     or wall clock time? How many executors executed those 10000 / 375
>     tasks?
>
>     Cheers,
>     Enrico
>
>
>     -------- Ursprüngliche Nachricht --------
>     Von: Shashank Rao <sh...@gmail.com>
>     Datum: 16.05.23 19:48 (GMT+01:00)
>     An: user@spark.apache.org
>     Betreff: Understanding Spark S3 Read Performance
>
>     Hi,
>     I'm trying to set up a Spark pipeline which reads data from S3 and
>     writes it into Google Big Query.
>
>     Environment Details:
>     -----------------------
>     Java 8
>     AWS EMR-6.10.0
>     Spark v3.3.1
>     2 m5.xlarge executor nodes
>
>
>     S3 Directory structure:
>     -----------
>     bucket-name:
>     |---folder1:
>     |---folder2:
>      |---file1.jsonl
>      |---file2.jsonl
>      ...
>      |---file12000.jsonl
>
>
>     Each file is of size 1.6 MB and there are a total of 12,000 files.
>
>     The code to read the source data looks like this:
>     ----------------------------
>     spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key",<ACCESS_KEY>)
>     spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key",<SECRET_KEY>)
>     spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint","s3.amazonaws.com
>     <http://s3.amazonaws.com>")
>     Dataset<Row> data = spark.read().option("recursiveFileLookup",
>     "true").json("s3a://bucket-name/folder1/folder2/")
>     ----------------------------
>
>     Now here's my problem:
>     This triggers two jobs: a) Listing Leaf Nodes, b) Json Read
>
>     Screenshot 2023-05-16 at 23.00.23.png
>
>     The list job takes around 12 mins and has 10,000 partitions/tasks.
>     The read job takes 14 mins and has 375 partitions/tasks.
>
>     1. What's going on here? Why does the list job take so much time?
>     I could write a simple JAVA code using AWS SDK which listed all
>     the files in the exact same S3 directory in 5 seconds.
>
>     2. Why are there 10,000 partitions/tasks for listing? Can this be
>     configured / changed? Is there any optimisation that can be done
>     to reduce the time taken over here?
>
>     3. What is the second job doing? Why are there 375 partitions in
>     that job?
>
>     If this works out, I actually need to run the pipeline on a much
>     larger data set of around 200,000 files and it doesn't look like
>     it will scale very well.
>
>     Please note, modifying the source data is not an option that I
>     have. Hence, I cannot merge multiple small files into a single
>     large file.
>
>     Any help is appreciated.
>
>
>     -- 
>     Thanks,
>     Shashank Rao
>
>
>
> -- 
> Regards,
> Shashank Rao


Re: Understanding Spark S3 Read Performance

Posted by Shashank Rao <sh...@gmail.com>.
Hi,
12 and 14 minutes are the wall clock times. There are totally 2 executors
that are executing the tasks.
Have attached the executor metrics screenshots for both the jobs.

[image: List Job.png]
[image: Read Job.png]



On Tue, 16 May 2023 at 23:47, info <in...@enrico.minack.dev> wrote:

> Hi,
>
> For clarification, are those 12 / 14 minutes cumulative cpu time or wall
> clock time? How many executors executed those 10000 / 375 tasks?
>
> Cheers,
> Enrico
>
>
> -------- Ursprüngliche Nachricht --------
> Von: Shashank Rao <sh...@gmail.com>
> Datum: 16.05.23 19:48 (GMT+01:00)
> An: user@spark.apache.org
> Betreff: Understanding Spark S3 Read Performance
>
> Hi,
> I'm trying to set up a Spark pipeline which reads data from S3 and writes
> it into Google Big Query.
>
> Environment Details:
> -----------------------
> Java 8
> AWS EMR-6.10.0
> Spark v3.3.1
> 2 m5.xlarge executor nodes
>
>
> S3 Directory structure:
> -----------
> bucket-name:
> |---folder1:
>   |---folder2:
>      |---file1.jsonl
>      |---file2.jsonl
>        ...
>      |---file12000.jsonl
>
>
> Each file is of size 1.6 MB and there are a total of 12,000 files.
>
> The code to read the source data looks like this:
> ----------------------------
>
> spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key",<ACCESS_KEY>)
>
> spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key",<SECRET_KEY>)
> spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint","
> s3.amazonaws.com")
> Dataset<Row> data = spark.read().option("recursiveFileLookup",
> "true").json("s3a://bucket-name/folder1/folder2/")
> ----------------------------
>
> Now here's my problem:
> This triggers two jobs: a) Listing Leaf Nodes, b) Json Read
>
> [image: Screenshot 2023-05-16 at 23.00.23.png]
>
> The list job takes around 12 mins and has 10,000 partitions/tasks. The
> read job takes 14 mins and has 375 partitions/tasks.
>
> 1. What's going on here? Why does the list job take so much time? I could
> write a simple JAVA code using AWS SDK which listed all the files in the
> exact same S3 directory in 5 seconds.
>
> 2. Why are there 10,000 partitions/tasks for listing? Can this be
> configured / changed? Is there any optimisation that can be done to reduce
> the time taken over here?
>
> 3. What is the second job doing? Why are there 375 partitions in that job?
>
> If this works out, I actually need to run the pipeline on a much larger
> data set of around 200,000 files and it doesn't look like it will scale
> very well.
>
> Please note, modifying the source data is not an option that I have.
> Hence, I cannot merge multiple small files into a single large file.
>
> Any help is appreciated.
>
>
> --
> Thanks,
> Shashank Rao
>


-- 
Regards,
Shashank Rao

RE: Understanding Spark S3 Read Performance

Posted by info <in...@enrico.minack.dev>.
Hi,For clarification, are those 12 / 14 minutes cumulative cpu time or wall clock time? How many executors executed those 10000 / 375 tasks?Cheers,Enrico
-------- Ursprüngliche Nachricht --------Von: Shashank Rao <sh...@gmail.com> Datum: 16.05.23  19:48  (GMT+01:00) An: user@spark.apache.org Betreff: Understanding Spark S3 Read Performance Hi,I'm trying to set up a Spark pipeline which reads data from S3 and writes it into Google Big Query.Environment Details:-----------------------Java 8AWS EMR-6.10.0Spark v3.3.12 m5.xlarge executor nodesS3 Directory structure:----------- bucket-name:|---folder1:                 |---folder2:                   |---file1.jsonl             |---file2.jsonl               ...                           |---file12000.jsonlEach file is of size 1.6 MB and there are a total of 12,000 files. The code to read the source data looks like this:----------------------------spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key",<ACCESS_KEY>)spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key",<SECRET_KEY>)spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint","s3.amazonaws.com")Dataset<Row> data = spark.read().option("recursiveFileLookup", "true").json("s3a://bucket-name/folder1/folder2/")----------------------------Now here's my problem:This triggers two jobs: a) Listing Leaf Nodes, b) Json ReadThe list job takes around 12 mins and has 10,000 partitions/tasks. The read job takes 14 mins and has 375 partitions/tasks.1. What's going on here? Why does the list job take so much time? I could write a simple JAVA code using AWS SDK which listed all the files in the exact same S3 directory in 5 seconds.2. Why are there 10,000 partitions/tasks for listing? Can this be configured / changed? Is there any optimisation that can be done to reduce the time taken over here?3. What is the second job doing? Why are there 375 partitions in that job? If this works out, I actually need to run the pipeline on a much larger data set of around 200,000 files and it doesn't look like it will scale very well.Please note, modifying the source data is not an option that I have. Hence, I cannot merge multiple small files into a single large file.Any help is appreciated.-- Thanks,Shashank Rao