You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Andy Davidson <An...@SantaCruzIntegration.com> on 2016/07/07 20:59:36 UTC

is dataframe.write() async? Streaming performance problem

I am running Spark 1.6.1 built for Hadoop 2.0.0-mr1-cdh4.2.0 and using kafka
direct stream approach. I am running into performance problems. My
processing time is > than my window size. Changing window sizes, adding
cores and executor memory does not change performance. I am having a lot of
trouble identifying the problem by at the metrics provided for streaming
apps in the spark application web UI.

I think my performance problem has to with writing the data to S3.

My app receives very complicated JSON. My program is simple, It sorts the
data into a small set of sets and writes each set as a separate S3 object.
The mini batch data has at most 300 events so I do not think shuffle is an
issue.

DataFrame rawDF = sqlContext.read().json(jsonRDD).cache();

Š Explode tagCol Š



DataFrame rulesDF = activityDF.select(tagCol).distinct();

Row[] rows = rulesDF.select(tagCol).collect();

List<String> tags = new ArrayList<String>(100);

for (Row row : rows) {

Object tag = row.get(0);

tags.add(tag.toString());

}



I think the for loop bellow is where the bottle neck is. Is write async() ?



If not is there an easy to to vectorize/parallelize this for loop or do I
have to create the threads my self?



Is creating threads in spark a bad idea?





for(String tag : tags) {

DataFrame saveDF = activityDF.filter(activityDF.col(tagCol).equalTo(tag));

if (saveDF.count() >= 1) { // I do not think count() is an issue performance
is about 34 ms

String dirPath = ³s3n://myBucket" + File.separator + date + File.separator +
tag + File.separator +  milliSeconds;

saveDF.write().json(dirPath);

}

}



Any suggestions would be greatly appreciated



Andy
> 
> 



RE: is dataframe.write() async? Streaming performance problem

Posted by Ewan Leith <ew...@realitymine.com>.
Writing (or reading) small files from spark to s3 can be seriously slow.

You'll get much higher throughput by doing a df.foreachPartition(partition => ...) and inside each partition, creating an aws s3 client then doing a partition.foreach and uploading the files using that s3 client with its own threadpool.

As long as you create the s3 client inside the foreachPartition, and close it after the partition.foreach(...) is done, you shouldn't have any issues.

Something roughly like this from the DStream docs:

  df.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }

Hope this helps,
Ewan

-----Original Message-----
From: Cody Koeninger [mailto:cody@koeninger.org] 
Sent: 08 July 2016 15:31
To: Andy Davidson <An...@santacruzintegration.com>
Cc: user @spark <us...@spark.apache.org>
Subject: Re: is dataframe.write() async? Streaming performance problem

Maybe obvious, but what happens when you change the s3 write to a println of all the data?  That should identify whether it's the issue.

count() and read.json() will involve additional tasks (run through the items in the rdd to count them, likewise to infer the schema) but for
300 records that shouldn't be much of an issue.

On Thu, Jul 7, 2016 at 3:59 PM, Andy Davidson <An...@santacruzintegration.com> wrote:
> I am running Spark 1.6.1 built for Hadoop 2.0.0-mr1-cdh4.2.0 and using 
> kafka direct stream approach. I am running into performance problems. 
> My processing time is > than my window size. Changing window sizes, 
> adding cores and executor memory does not change performance. I am 
> having a lot of trouble identifying the problem by at the metrics 
> provided for streaming apps in the spark application web UI.
>
> I think my performance problem has to with writing the data to S3.
>
> My app receives very complicated JSON. My program is simple, It sorts 
> the data into a small set of sets and writes each set as a separate S3 object.
> The mini batch data has at most 300 events so I do not think shuffle 
> is an issue.
>
> DataFrame rawDF = sqlContext.read().json(jsonRDD).cache();
>
> … Explode tagCol …
>
>
> DataFrame rulesDF = activityDF.select(tagCol).distinct();
>
> Row[] rows = rulesDF.select(tagCol).collect();
>
> List<String> tags = new ArrayList<String>(100);
>
> for (Row row : rows) {
>
> Object tag = row.get(0);
>
> tags.add(tag.toString());
>
> }
>
>
> I think the for loop bellow is where the bottle neck is. Is write async() ?
>
>
> If not is there an easy to to vectorize/parallelize this for loop or 
> do I have to create the threads my self?
>
>
> Is creating threads in spark a bad idea?
>
>
>
> for(String tag : tags) {
>
> DataFrame saveDF = 
> activityDF.filter(activityDF.col(tagCol).equalTo(tag));
>
> if (saveDF.count() >= 1) { // I do not think count() is an issue 
> performance is about 34 ms
>
> String dirPath = “s3n://myBucket" + File.separator + date + 
> File.separator + tag + File.separator +  milliSeconds;
>
> saveDF.write().json(dirPath);
>
> }
>
> }
>
>
> Any suggestions would be greatly appreciated
>
>
> Andy
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: is dataframe.write() async? Streaming performance problem

Posted by Cody Koeninger <co...@koeninger.org>.
Maybe obvious, but what happens when you change the s3 write to a
println of all the data?  That should identify whether it's the issue.

count() and read.json() will involve additional tasks (run through the
items in the rdd to count them, likewise to infer the schema) but for
300 records that shouldn't be much of an issue.

On Thu, Jul 7, 2016 at 3:59 PM, Andy Davidson
<An...@santacruzintegration.com> wrote:
> I am running Spark 1.6.1 built for Hadoop 2.0.0-mr1-cdh4.2.0 and using kafka
> direct stream approach. I am running into performance problems. My
> processing time is > than my window size. Changing window sizes, adding
> cores and executor memory does not change performance. I am having a lot of
> trouble identifying the problem by at the metrics provided for streaming
> apps in the spark application web UI.
>
> I think my performance problem has to with writing the data to S3.
>
> My app receives very complicated JSON. My program is simple, It sorts the
> data into a small set of sets and writes each set as a separate S3 object.
> The mini batch data has at most 300 events so I do not think shuffle is an
> issue.
>
> DataFrame rawDF = sqlContext.read().json(jsonRDD).cache();
>
> … Explode tagCol …
>
>
> DataFrame rulesDF = activityDF.select(tagCol).distinct();
>
> Row[] rows = rulesDF.select(tagCol).collect();
>
> List<String> tags = new ArrayList<String>(100);
>
> for (Row row : rows) {
>
> Object tag = row.get(0);
>
> tags.add(tag.toString());
>
> }
>
>
> I think the for loop bellow is where the bottle neck is. Is write async() ?
>
>
> If not is there an easy to to vectorize/parallelize this for loop or do I
> have to create the threads my self?
>
>
> Is creating threads in spark a bad idea?
>
>
>
> for(String tag : tags) {
>
> DataFrame saveDF = activityDF.filter(activityDF.col(tagCol).equalTo(tag));
>
> if (saveDF.count() >= 1) { // I do not think count() is an issue performance
> is about 34 ms
>
> String dirPath = “s3n://myBucket" + File.separator + date + File.separator +
> tag + File.separator +  milliSeconds;
>
> saveDF.write().json(dirPath);
>
> }
>
> }
>
>
> Any suggestions would be greatly appreciated
>
>
> Andy
>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org