You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Andy Davidson <An...@SantaCruzIntegration.com> on 2016/07/08 17:29:12 UTC
can I use ExectorService in my driver? was: is dataframe.write()
async? Streaming performance problem
Hi Ewan
Currently I split my dataframe into n smaller dataframes can call
write.().json(³S3://³)
Each data frame becomes a single S3 object.
I assume for your solution to work I would need to reparation(1) each of the
smaller sets so that they are written as a single s3 object.
I am also considering using a java executorService and thread pool. Its easy
to do. Each thread would call df.write.json(³s3²://); One advantage of this
is that I do not need to make any assumptions about how spark is
implemented.
I assume the thread pool is running on the driver so the slaves do not incur
any extra overhead.
Thanks
Andy
From: Ewan Leith <ew...@realitymine.com>
Date: Friday, July 8, 2016 at 8:52 AM
To: Cody Koeninger <co...@koeninger.org>, Andrew Davidson
<An...@SantaCruzIntegration.com>
Cc: "user @spark" <us...@spark.apache.org>
Subject: RE: is dataframe.write() async? Streaming performance problem
> Writing (or reading) small files from spark to s3 can be seriously slow.
>
> You'll get much higher throughput by doing a df.foreachPartition(partition =>
> ...) and inside each partition, creating an aws s3 client then doing a
> partition.foreach and uploading the files using that s3 client with its own
> threadpool.
>
> As long as you create the s3 client inside the foreachPartition, and close it
> after the partition.foreach(...) is done, you shouldn't have any issues.
>
> Something roughly like this from the DStream docs:
>
> df.foreachPartition { partitionOfRecords =>
> val connection = createNewConnection()
> partitionOfRecords.foreach(record => connection.send(record))
> connection.close()
> }
>
> Hope this helps,
> Ewan
>
> -----Original Message-----
> From: Cody Koeninger [mailto:cody@koeninger.org]
> Sent: 08 July 2016 15:31
> To: Andy Davidson <An...@santacruzintegration.com>
> Cc: user @spark <us...@spark.apache.org>
> Subject: Re: is dataframe.write() async? Streaming performance problem
>
> Maybe obvious, but what happens when you change the s3 write to a println of
> all the data? That should identify whether it's the issue.
>
> count() and read.json() will involve additional tasks (run through the items
> in the rdd to count them, likewise to infer the schema) but for
> 300 records that shouldn't be much of an issue.
>
> On Thu, Jul 7, 2016 at 3:59 PM, Andy Davidson <An...@santacruzintegration.com>
> wrote:
>> I am running Spark 1.6.1 built for Hadoop 2.0.0-mr1-cdh4.2.0 and using
>> kafka direct stream approach. I am running into performance problems.
>> My processing time is > than my window size. Changing window sizes,
>> adding cores and executor memory does not change performance. I am
>> having a lot of trouble identifying the problem by at the metrics
>> provided for streaming apps in the spark application web UI.
>>
>> I think my performance problem has to with writing the data to S3.
>>
>> My app receives very complicated JSON. My program is simple, It sorts
>> the data into a small set of sets and writes each set as a separate S3
>> object.
>> The mini batch data has at most 300 events so I do not think shuffle
>> is an issue.
>>
>> DataFrame rawDF = sqlContext.read().json(jsonRDD).cache();
>>
>> Explode tagCol
>>
>>
>> DataFrame rulesDF = activityDF.select(tagCol).distinct();
>>
>> Row[] rows = rulesDF.select(tagCol).collect();
>>
>> List<String> tags = new ArrayList<String>(100);
>>
>> for (Row row : rows) {
>>
>> Object tag = row.get(0);
>>
>> tags.add(tag.toString());
>>
>> }
>>
>>
>> I think the for loop bellow is where the bottle neck is. Is write async() ?
>>
>>
>> If not is there an easy to to vectorize/parallelize this for loop or
>> do I have to create the threads my self?
>>
>>
>> Is creating threads in spark a bad idea?
>>
>>
>>
>> for(String tag : tags) {
>>
>> DataFrame saveDF =
>> activityDF.filter(activityDF.col(tagCol).equalTo(tag));
>>
>> if (saveDF.count() >= 1) { // I do not think count() is an issue
>> performance is about 34 ms
>>
>> String dirPath = ³s3n://myBucket" + File.separator + date +
>> File.separator + tag + File.separator + milliSeconds;
>>
>> saveDF.write().json(dirPath);
>>
>> }
>>
>> }
>>
>>
>> Any suggestions would be greatly appreciated
>>
>>
>> Andy
>>
>>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>