You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Andy Davidson <An...@SantaCruzIntegration.com> on 2016/07/28 02:19:43 UTC

performance problem when reading lots of small files created by spark streaming.

I have a relatively small data set however it is split into many small JSON
files. Each file is between maybe 4K and 400K
This is probably a very common issue for anyone using spark streaming. My
streaming app works fine, how ever my batch application takes several hours
to run. 

All I am doing is calling count(). Currently I am trying to read the files
from s3. When I look at the app UI it looks like spark is blocked probably
on IO? Adding additional workers and memory does not improve performance.

I am able to copy the files from s3 to a worker relatively quickly. So I do
not think s3 read time is the problem.

In the past when I had similar data sets stored on HDFS I was able to use
coalesce() to reduce the number of partition from 200K to 30. This made a
big improvement in processing time. How ever when I read from s3 coalesce()
does not improve performance.

I tried copying the files to a normal file system and then using Œhadoop fs
put¹ to copy the files to hdfs how ever this takes several hours and is no
where near completion. It appears hdfs does not deal with small files well.

I am considering copying the files from s3 to a normal file system on one of
my workers and then concatenating the files into a few much large files,
then using Œhadoop fs put¹ to move them to hdfs. Do you think this would
improve the spark count() performance issue?

Does anyone know of heuristics for determining the number or size of the
concatenated files?

Thanks in advance

Andy



Re: performance problem when reading lots of small files created by spark streaming.

Posted by Gourav Sengupta <go...@gmail.com>.
There is an option to join small files up. If you are unable to find it
just let me know.


Regards,
Gourav

On Thu, Jul 28, 2016 at 4:58 PM, Andy Davidson <
Andy@santacruzintegration.com> wrote:

> Hi Pedro
>
> Thanks for the explanation. I started watching your repo. In the short
> term I think I am going to try concatenating my small files into 64MB and
> using HDFS. My spark streaming app is implemented Java and uses data
> frames. It writes to s3. My batch processing is written in python It reads
> data into data frames.
>
> Its probably a lot of work to make your solution working in these other
> contexts.
>
> Here is another use case you might be interested in
> Writing multiple files to S3 is really slow. It causes a lot of problems
> for my streaming app. Bad things happen if your processing time exceeds
> your window length. Our streaming app must save all the input. For each
> mini batch we split the input into as many as 30 different data sets. Each
> one needs to be written to S3.
>
> As a temporary work around I use an executor service to try and get more
> concurrent writes. Ideally the spark frame work would provide support for
> async IO, and hopefully the S3 performance issue would be improved. Here is
> my code if you are interested
>
>
> public class StreamingKafkaGnipCollector {
>
>     static final int POOL_SIZE = 30;
>
>     static ExecutorService executor = Executors.newFixedThreadPool(
> POOL_SIZE);
>
> …
>
> private static void saveRawInput(SQLContext sqlContext,
> JavaPairInputDStream<String, String> messages, String outputURIBase) {
>
> JavaDStream<String> lines = messages.map(new Function<Tuple2<String,
> String>, String>() {
>
> private static final long serialVersionUID = 1L;
>
>
> @Override
>
> public String call(Tuple2<String, String> tuple2) {
>
> //logger.warn("TODO _2:{}", tuple2._2);
>
> return tuple2._2();
>
> }
>
> });
>
>
> lines.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() {
>
> @Override
>
> public void call(JavaRDD<String> jsonRDD, Time time) throws Exception {
> …
>
> // df.write().json("s3://"); is very slow
>
> // run saves concurrently
>
> List<SaveData> saveData = new ArrayList<SaveData>(100);
>
> for (String tag: tags) {
>
> DataFrame saveDF = activityDF.filter(activityDF.col(tagCol).equalTo(tag));
>
> String dirPath = createPath(outputURIBase, date, tag, milliSeconds);
>
> saveData.add(new SaveData(saveDF, dirPath));
>
> }
>
>
> saveImpl(saveData, executor); // concurrent writes to S3
>
> }
>
> private void saveImpl(List<SaveData> saveData, ExecutorService executor) {
>
> List<Future<?>> runningThreads = new ArrayList<Future<?>>(POOL_SIZE);
>
> for(SaveData data : saveData) {
>
> SaveWorker worker = new SaveWorker(data);
>
> Future<?> f = executor.submit(worker);
>
> runningThreads.add(f);
>
> }
>
> // wait for all the workers to complete
>
> for (Future<?> worker : runningThreads) {
>
> try {
>
> worker.get();
>
> logger.debug("worker completed");
>
> } catch (InterruptedException e) {
>
> logger.error("", e);
>
> } catch (ExecutionException e) {
>
> logger.error("", e);
>
> }
>
> }
>
> }
>
>
> static class SaveData {
>
> private DataFrame df;
>
> private String path;
>
>
> SaveData(DataFrame df, String path) {
>
> this.df = df;
>
> this.path = path;
>
> }
>
> }
>
> static class SaveWorker implements Runnable {
>
> SaveData data;
>
>
> public SaveWorker(SaveData data) {
>
> this.data = data;
>
> }
>
>
> @Override
>
> public void run() {
>
> if (data.df.count() >= 1) {
>
> data.df.write().json(data.path);
>
> }
>
> }
>
> }
>
> }
>
>
> From: Pedro Rodriguez <sk...@gmail.com>
> Date: Wednesday, July 27, 2016 at 8:40 PM
> To: Andrew Davidson <An...@SantaCruzIntegration.com>
> Cc: "user @spark" <us...@spark.apache.org>
> Subject: Re: performance problem when reading lots of small files created
> by spark streaming.
>
> There are a few blog posts that detail one possible/likely issue for
> example:
> http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219
>
> TLDR: The hadoop libraries spark uses assumes that its input comes from a
>  file system (works with HDFS) however S3 is a key value store, not a file
> system. Somewhere along the line, this makes things very slow. Below I
> describe their approach and a library I am working on to solve this problem.
>
> (Much) Longer Version (with a shiny new library in development):
> So far in my reading of source code, Hadoop attempts to actually read from
> S3 which can be expensive particularly since it does so from a single
> driver core (different from listing files, actually reading them, I can
> find the source code and link it later if you would like). The concept
> explained above is to instead use the AWS sdk to list files then distribute
> the files names as a collection with sc.parallelize, then read them in
> parallel. I found this worked, but lacking in a few ways so I started this
> project: https://github.com/EntilZha/spark-s3
>
> This takes that idea further by:
> 1. Rather than sc.parallelize, implement the RDD interface where each
> partition is defined by the files it needs to read (haven't gotten to
> DataFrames yet)
> 2. At the driver node, use the AWS SDK to list all the files with their
> size (listing is fast), then run the Least Processing Time Algorithm to
> sift the files into roughly balanced partitions by size
> 3. API: S3Context(sc).textFileByPrefix("bucket", "file1",
> "folder2").regularRDDOperationsHere or import implicits and do
> sc.s3.textFileByPrefix
>
> At present, I am battle testing and benchmarking it at my current job and
> results are promising with significant improvements to jobs dealing with
> many files especially many small files and to jobs whose input is
> unbalanced to start with. Jobs perform better because: 1) there isn't a
> long stall at the driver when hadoop decides how to split S3 files 2) the
> partitions end up nearly perfectly balanced because of LPT algorithm.
>
> Since I hadn't intended to advertise this quite yet the documentation is
> not super polished but exists here:
> http://spark-s3.entilzha.io/latest/api/#io.entilzha.spark.s3.S3Context
>
> I am completing the sonatype process for publishing artifacts on maven
> central (this should be done by tomorrow so referencing
> "io.entilzha:spark-s3_2.10:0.0.0" should work very soon). I would love to
> hear if this library solution works, otherwise I hope the blog post above
> is illuminating.
>
> Pedro
>
> On Wed, Jul 27, 2016 at 8:19 PM, Andy Davidson <
> Andy@santacruzintegration.com> wrote:
>
>> I have a relatively small data set however it is split into many small
>> JSON files. Each file is between maybe 4K and 400K
>> This is probably a very common issue for anyone using spark streaming. My
>> streaming app works fine, how ever my batch application takes several hours
>> to run.
>>
>> All I am doing is calling count(). Currently I am trying to read the
>> files from s3. When I look at the app UI it looks like spark is blocked
>> probably on IO? Adding additional workers and memory does not improve
>> performance.
>>
>> I am able to copy the files from s3 to a worker relatively quickly. So I
>> do not think s3 read time is the problem.
>>
>> In the past when I had similar data sets stored on HDFS I was able to use
>> coalesce() to reduce the number of partition from 200K to 30. This made a
>> big improvement in processing time. How ever when I read from s3 coalesce()
>> does not improve performance.
>>
>> I tried copying the files to a normal file system and then using ‘hadoop
>> fs put’ to copy the files to hdfs how ever this takes several hours and is
>> no where near completion. It appears hdfs does not deal with small files
>> well.
>>
>> I am considering copying the files from s3 to a normal file system on one
>> of my workers and then concatenating the files into a few much large files,
>> then using ‘hadoop fs put’ to move them to hdfs. Do you think this would
>> improve the spark count() performance issue?
>>
>> Does anyone know of heuristics for determining the number or size of the
>> concatenated files?
>>
>> Thanks in advance
>>
>> Andy
>>
>
>
>
> --
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
>
> ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
> Github: github.com/EntilZha | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
>
>

Re: performance problem when reading lots of small files created by spark streaming.

Posted by Andy Davidson <An...@SantaCruzIntegration.com>.
Hi Pedro

Thanks for the explanation. I started watching your repo. In the short term
I think I am going to try concatenating my small files into 64MB and using
HDFS. My spark streaming app is implemented Java and uses data frames. It
writes to s3. My batch processing is written in python It reads data into
data frames.

Its probably a lot of work to make your solution working in these other
contexts.

Here is another use case you might be interested in
Writing multiple files to S3 is really slow. It causes a lot of problems for
my streaming app. Bad things happen if your processing time exceeds your
window length. Our streaming app must save all the input. For each mini
batch we split the input into as many as 30 different data sets. Each one
needs to be written to S3.

As a temporary work around I use an executor service to try and get more
concurrent writes. Ideally the spark frame work would provide support for
async IO, and hopefully the S3 performance issue would be improved. Here is
my code if you are interested


public class StreamingKafkaGnipCollector {

    static final int POOL_SIZE = 30;

    static ExecutorService executor =
Executors.newFixedThreadPool(POOL_SIZE);


Š

private static void saveRawInput(SQLContext sqlContext,
JavaPairInputDStream<String, String> messages, String outputURIBase) {

JavaDStream<String> lines = messages.map(new Function<Tuple2<String,
String>, String>() {

private static final long serialVersionUID = 1L;



@Override

public String call(Tuple2<String, String> tuple2) {

//logger.warn("TODO _2:{}", tuple2._2);

return tuple2._2();

}

});



lines.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() {

@Override

public void call(JavaRDD<String> jsonRDD, Time time) throws Exception {

Š
// df.write().json("s3://"); is very slow

// run saves concurrently

List<SaveData> saveData = new ArrayList<SaveData>(100);

for (String tag: tags) {

DataFrame saveDF = activityDF.filter(activityDF.col(tagCol).equalTo(tag));

String dirPath = createPath(outputURIBase, date, tag, milliSeconds);

saveData.add(new SaveData(saveDF, dirPath));

}



saveImpl(saveData, executor); // concurrent writes to S3

}

private void saveImpl(List<SaveData> saveData, ExecutorService executor) {

List<Future<?>> runningThreads = new ArrayList<Future<?>>(POOL_SIZE);

for(SaveData data : saveData) {

SaveWorker worker = new SaveWorker(data);

Future<?> f = executor.submit(worker);

runningThreads.add(f);

}

// wait for all the workers to complete

for (Future<?> worker : runningThreads) {

try {

worker.get();

logger.debug("worker completed");

} catch (InterruptedException e) {

logger.error("", e);

} catch (ExecutionException e) {

logger.error("", e);

}

} 

}



static class SaveData {

private DataFrame df;

private String path;



SaveData(DataFrame df, String path) {

this.df = df;

this.path = path;

}

}

static class SaveWorker implements Runnable {

SaveData data;



public SaveWorker(SaveData data) {

this.data = data;

}



@Override

public void run() {

if (data.df.count() >= 1) {

data.df.write().json(data.path);

}

}

}

}



From:  Pedro Rodriguez <sk...@gmail.com>
Date:  Wednesday, July 27, 2016 at 8:40 PM
To:  Andrew Davidson <An...@SantaCruzIntegration.com>
Cc:  "user @spark" <us...@spark.apache.org>
Subject:  Re: performance problem when reading lots of small files created
by spark streaming.

> There are a few blog posts that detail one possible/likely issue for example:
> http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219
> 
> TLDR: The hadoop libraries spark uses assumes that its input comes from a
> file system (works with HDFS) however S3 is a key value store, not a file
> system. Somewhere along the line, this makes things very slow. Below I
> describe their approach and a library I am working on to solve this problem.
> 
> (Much) Longer Version (with a shiny new library in development):
> So far in my reading of source code, Hadoop attempts to actually read from S3
> which can be expensive particularly since it does so from a single driver core
> (different from listing files, actually reading them, I can find the source
> code and link it later if you would like). The concept explained above is to
> instead use the AWS sdk to list files then distribute the files names as a
> collection with sc.parallelize, then read them in parallel. I found this
> worked, but lacking in a few ways so I started this project:
> https://github.com/EntilZha/spark-s3
> 
> This takes that idea further by:
> 1. Rather than sc.parallelize, implement the RDD interface where each
> partition is defined by the files it needs to read (haven't gotten to
> DataFrames yet)
> 2. At the driver node, use the AWS SDK to list all the files with their size
> (listing is fast), then run the Least Processing Time Algorithm to sift the
> files into roughly balanced partitions by size
> 3. API: S3Context(sc).textFileByPrefix("bucket", "file1",
> "folder2").regularRDDOperationsHere or import implicits and do
> sc.s3.textFileByPrefix
> 
> At present, I am battle testing and benchmarking it at my current job and
> results are promising with significant improvements to jobs dealing with many
> files especially many small files and to jobs whose input is unbalanced to
> start with. Jobs perform better because: 1) there isn't a long stall at the
> driver when hadoop decides how to split S3 files 2) the partitions end up
> nearly perfectly balanced because of LPT algorithm.
> 
> Since I hadn't intended to advertise this quite yet the documentation is not
> super polished but exists here:
> http://spark-s3.entilzha.io/latest/api/#io.entilzha.spark.s3.S3Context
> 
> I am completing the sonatype process for publishing artifacts on maven central
> (this should be done by tomorrow so referencing
> "io.entilzha:spark-s3_2.10:0.0.0" should work very soon). I would love to hear
> if this library solution works, otherwise I hope the blog post above is
> illuminating.
> 
> Pedro
> 
> On Wed, Jul 27, 2016 at 8:19 PM, Andy Davidson <An...@santacruzintegration.com>
> wrote:
>> I have a relatively small data set however it is split into many small JSON
>> files. Each file is between maybe 4K and 400K
>> This is probably a very common issue for anyone using spark streaming. My
>> streaming app works fine, how ever my batch application takes several hours
>> to run. 
>> 
>> All I am doing is calling count(). Currently I am trying to read the files
>> from s3. When I look at the app UI it looks like spark is blocked probably on
>> IO? Adding additional workers and memory does not improve performance.
>> 
>> I am able to copy the files from s3 to a worker relatively quickly. So I do
>> not think s3 read time is the problem.
>> 
>> In the past when I had similar data sets stored on HDFS I was able to use
>> coalesce() to reduce the number of partition from 200K to 30. This made a big
>> improvement in processing time. How ever when I read from s3 coalesce() does
>> not improve performance.
>> 
>> I tried copying the files to a normal file system and then using Œhadoop fs
>> put¹ to copy the files to hdfs how ever this takes several hours and is no
>> where near completion. It appears hdfs does not deal with small files well.
>> 
>> I am considering copying the files from s3 to a normal file system on one of
>> my workers and then concatenating the files into a few much large files, then
>> using Œhadoop fs put¹ to move them to hdfs. Do you think this would improve
>> the spark count() performance issue?
>> 
>> Does anyone know of heuristics for determining the number or size of the
>> concatenated files?
>> 
>> Thanks in advance
>> 
>> Andy
> 
> 
> 
> -- 
> Pedro Rodriguez
> PhD Student in Distributed Machine Learning | CU Boulder
> UC Berkeley AMPLab Alumni
> 
> ski.rodriguez@gmail.com | pedrorodriguez.io <http://pedrorodriguez.io>  |
> 909-353-4423
> Github: github.com/EntilZha <http://github.com/EntilZha>  | LinkedIn:
> https://www.linkedin.com/in/pedrorodriguezscience
> 



Re: performance problem when reading lots of small files created by spark streaming.

Posted by Pedro Rodriguez <sk...@gmail.com>.
There are a few blog posts that detail one possible/likely issue for
example:
http://tech.kinja.com/how-not-to-pull-from-s3-using-apache-spark-1704509219

TLDR: The hadoop libraries spark uses assumes that its input comes from a
 file system (works with HDFS) however S3 is a key value store, not a file
system. Somewhere along the line, this makes things very slow. Below I
describe their approach and a library I am working on to solve this problem.

(Much) Longer Version (with a shiny new library in development):
So far in my reading of source code, Hadoop attempts to actually read from
S3 which can be expensive particularly since it does so from a single
driver core (different from listing files, actually reading them, I can
find the source code and link it later if you would like). The concept
explained above is to instead use the AWS sdk to list files then distribute
the files names as a collection with sc.parallelize, then read them in
parallel. I found this worked, but lacking in a few ways so I started this
project: https://github.com/EntilZha/spark-s3

This takes that idea further by:
1. Rather than sc.parallelize, implement the RDD interface where each
partition is defined by the files it needs to read (haven't gotten to
DataFrames yet)
2. At the driver node, use the AWS SDK to list all the files with their
size (listing is fast), then run the Least Processing Time Algorithm to
sift the files into roughly balanced partitions by size
3. API: S3Context(sc).textFileByPrefix("bucket", "file1",
"folder2").regularRDDOperationsHere or import implicits and do
sc.s3.textFileByPrefix

At present, I am battle testing and benchmarking it at my current job and
results are promising with significant improvements to jobs dealing with
many files especially many small files and to jobs whose input is
unbalanced to start with. Jobs perform better because: 1) there isn't a
long stall at the driver when hadoop decides how to split S3 files 2) the
partitions end up nearly perfectly balanced because of LPT algorithm.

Since I hadn't intended to advertise this quite yet the documentation is
not super polished but exists here:
http://spark-s3.entilzha.io/latest/api/#io.entilzha.spark.s3.S3Context

I am completing the sonatype process for publishing artifacts on maven
central (this should be done by tomorrow so referencing
"io.entilzha:spark-s3_2.10:0.0.0" should work very soon). I would love to
hear if this library solution works, otherwise I hope the blog post above
is illuminating.

Pedro

On Wed, Jul 27, 2016 at 8:19 PM, Andy Davidson <
Andy@santacruzintegration.com> wrote:

> I have a relatively small data set however it is split into many small
> JSON files. Each file is between maybe 4K and 400K
> This is probably a very common issue for anyone using spark streaming. My
> streaming app works fine, how ever my batch application takes several hours
> to run.
>
> All I am doing is calling count(). Currently I am trying to read the files
> from s3. When I look at the app UI it looks like spark is blocked probably
> on IO? Adding additional workers and memory does not improve performance.
>
> I am able to copy the files from s3 to a worker relatively quickly. So I
> do not think s3 read time is the problem.
>
> In the past when I had similar data sets stored on HDFS I was able to use
> coalesce() to reduce the number of partition from 200K to 30. This made a
> big improvement in processing time. How ever when I read from s3 coalesce()
> does not improve performance.
>
> I tried copying the files to a normal file system and then using ‘hadoop
> fs put’ to copy the files to hdfs how ever this takes several hours and is
> no where near completion. It appears hdfs does not deal with small files
> well.
>
> I am considering copying the files from s3 to a normal file system on one
> of my workers and then concatenating the files into a few much large files,
> then using ‘hadoop fs put’ to move them to hdfs. Do you think this would
> improve the spark count() performance issue?
>
> Does anyone know of heuristics for determining the number or size of the
> concatenated files?
>
> Thanks in advance
>
> Andy
>



-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodriguez@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience