You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Andy Davidson <An...@SantaCruzIntegration.com> on 2016/07/08 17:29:12 UTC

can I use ExectorService in my driver? was: is dataframe.write() async? Streaming performance problem

Hi Ewan

Currently I split my dataframe into n smaller dataframes can call
write.().json(³S3://³)

Each data frame becomes a single S3 object.

I assume for your solution to work I would need to reparation(1) each of the
smaller sets so that they are written as a single s3 object.


I am also considering using a java executorService and thread pool. Its easy
to do. Each thread would call df.write.json(³s3²://); One advantage of this
is that I do not need to make any assumptions about how spark is
implemented.

I assume the thread pool is running on the driver so the slaves do not incur
any extra overhead.

Thanks

Andy

From:  Ewan Leith <ew...@realitymine.com>
Date:  Friday, July 8, 2016 at 8:52 AM
To:  Cody Koeninger <co...@koeninger.org>, Andrew Davidson
<An...@SantaCruzIntegration.com>
Cc:  "user @spark" <us...@spark.apache.org>
Subject:  RE: is dataframe.write() async? Streaming performance problem

> Writing (or reading) small files from spark to s3 can be seriously slow.
> 
> You'll get much higher throughput by doing a df.foreachPartition(partition =>
> ...) and inside each partition, creating an aws s3 client then doing a
> partition.foreach and uploading the files using that s3 client with its own
> threadpool.
> 
> As long as you create the s3 client inside the foreachPartition, and close it
> after the partition.foreach(...) is done, you shouldn't have any issues.
> 
> Something roughly like this from the DStream docs:
> 
>   df.foreachPartition { partitionOfRecords =>
>     val connection = createNewConnection()
>     partitionOfRecords.foreach(record => connection.send(record))
>     connection.close()
>   }
> 
> Hope this helps,
> Ewan
> 
> -----Original Message-----
> From: Cody Koeninger [mailto:cody@koeninger.org]
> Sent: 08 July 2016 15:31
> To: Andy Davidson <An...@santacruzintegration.com>
> Cc: user @spark <us...@spark.apache.org>
> Subject: Re: is dataframe.write() async? Streaming performance problem
> 
> Maybe obvious, but what happens when you change the s3 write to a println of
> all the data?  That should identify whether it's the issue.
> 
> count() and read.json() will involve additional tasks (run through the items
> in the rdd to count them, likewise to infer the schema) but for
> 300 records that shouldn't be much of an issue.
> 
> On Thu, Jul 7, 2016 at 3:59 PM, Andy Davidson <An...@santacruzintegration.com>
> wrote:
>>  I am running Spark 1.6.1 built for Hadoop 2.0.0-mr1-cdh4.2.0 and using
>>  kafka direct stream approach. I am running into performance problems.
>>  My processing time is > than my window size. Changing window sizes,
>>  adding cores and executor memory does not change performance. I am
>>  having a lot of trouble identifying the problem by at the metrics
>>  provided for streaming apps in the spark application web UI.
>> 
>>  I think my performance problem has to with writing the data to S3.
>> 
>>  My app receives very complicated JSON. My program is simple, It sorts
>>  the data into a small set of sets and writes each set as a separate S3
>> object.
>>  The mini batch data has at most 300 events so I do not think shuffle
>>  is an issue.
>> 
>>  DataFrame rawDF = sqlContext.read().json(jsonRDD).cache();
>> 
>>  Š Explode tagCol Š
>> 
>> 
>>  DataFrame rulesDF = activityDF.select(tagCol).distinct();
>> 
>>  Row[] rows = rulesDF.select(tagCol).collect();
>> 
>>  List<String> tags = new ArrayList<String>(100);
>> 
>>  for (Row row : rows) {
>> 
>>  Object tag = row.get(0);
>> 
>>  tags.add(tag.toString());
>> 
>>  }
>> 
>> 
>>  I think the for loop bellow is where the bottle neck is. Is write async() ?
>> 
>> 
>>  If not is there an easy to to vectorize/parallelize this for loop or
>>  do I have to create the threads my self?
>> 
>> 
>>  Is creating threads in spark a bad idea?
>> 
>> 
>> 
>>  for(String tag : tags) {
>> 
>>  DataFrame saveDF =
>>  activityDF.filter(activityDF.col(tagCol).equalTo(tag));
>> 
>>  if (saveDF.count() >= 1) { // I do not think count() is an issue
>>  performance is about 34 ms
>> 
>>  String dirPath = ³s3n://myBucket" + File.separator + date +
>>  File.separator + tag + File.separator +  milliSeconds;
>> 
>>  saveDF.write().json(dirPath);
>> 
>>  }
>> 
>>  }
>> 
>> 
>>  Any suggestions would be greatly appreciated
>> 
>> 
>>  Andy
>> 
>> 
> 
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> 
> 
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> 
>