You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by adam kramer <ad...@gmail.com> on 2016/10/19 05:59:53 UTC

Equivalent Parquet File Repartitioning Benefits for Join/Shuffle?

Hello All,

I’m trying to improve join efficiency within (self-join) and across
data sets loaded from different parquet files primarily due to a
multi-stage data ingestion environment.

Are there specific benefits to shuffling efficiency (e.g. no network
transmission) if the parquet files are written from equivalently
partitioned datasets (i.e. same partition columns and number of
partitions)?

A self-join and multi-join Scala shell example that uses the method in question:
% val df1 = sqlContext.read.parquet("hdfs://someserver:9010/default-partitioned-a-z-file-1")
% val df2 = sqlContext.read.parquet("hdfs://someserver:9010/default-partitioned-a-z-file-2")
% val df1_part = df1.repartition(500,$”a",$”b",$”c")
% val df2_part = df2.repartition(500,$”a",$”b",$”c")
% df1_part.write.format("parquet").mode(SaveMode.Overwrite).save("hdfs://someserver:9010/a-b-c-partitioned-file-1”)
% df2_part.write.format("parquet").mode(SaveMode.Overwrite).save("hdfs://someserver:9010/a-b-c-partitioned-file-2”)
% val reloaded_df1_part =
sqlContext.read.parquet("hdfs://someserver:9010/a-b-c-partitioned-file-1”)
% val reloaded_df2_part =
sqlContext.read.parquet("hdfs://someserver:9010/a-b-c-partitioned-file-2”)
% val superfast_self_join =
reloaded_df1_part.join(reloaded_df1_part.select($”a”,$”b”,$”c”,$”d”.as(“right_d”)),
Seq(“a”,”b”,”c”))
% val superfast_multi_join =
reloaded_df1_part.join(reloaded_df2_part.select($”a”,$”b”,$”c”,$”not_in_df1”),
Seq(“a”,”b”,”c”))
% superfast_self_join.count
% superfast_multi_join.count

Ignoring the time necessary to repartition and assuming good
partitioning cardinality (while joining groups of rows), are there
performance benefits to this approach for joins ‘superfast_self_join'
and 'superfast_multi_join'? Or is there no benefit as the partitioner
information is lost upon persistence/write to parquet?

Note I am currently using Spark 1.6.3 and moving to 2.0.1 in the near future.

Thank you for any insights.

Adam

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Equivalent Parquet File Repartitioning Benefits for Join/Shuffle?

Posted by adam kramer <ad...@gmail.com>.
I believe what I am looking for is DataFrameWriter.bucketBy which
would allow for bucketing into physical parquet files by the desired
columns. Then my question would be can DataFrame/Sets take advantage
of this physical bucketing upon read of the parquet file for something
like a self-join on the bucketed columns?

On Tue, Oct 18, 2016 at 10:59 PM, adam kramer <ad...@gmail.com> wrote:
> Hello All,
>
> I’m trying to improve join efficiency within (self-join) and across
> data sets loaded from different parquet files primarily due to a
> multi-stage data ingestion environment.
>
> Are there specific benefits to shuffling efficiency (e.g. no network
> transmission) if the parquet files are written from equivalently
> partitioned datasets (i.e. same partition columns and number of
> partitions)?
>
> A self-join and multi-join Scala shell example that uses the method in question:
> % val df1 = sqlContext.read.parquet("hdfs://someserver:9010/default-partitioned-a-z-file-1")
> % val df2 = sqlContext.read.parquet("hdfs://someserver:9010/default-partitioned-a-z-file-2")
> % val df1_part = df1.repartition(500,$”a",$”b",$”c")
> % val df2_part = df2.repartition(500,$”a",$”b",$”c")
> % df1_part.write.format("parquet").mode(SaveMode.Overwrite).save("hdfs://someserver:9010/a-b-c-partitioned-file-1”)
> % df2_part.write.format("parquet").mode(SaveMode.Overwrite).save("hdfs://someserver:9010/a-b-c-partitioned-file-2”)
> % val reloaded_df1_part =
> sqlContext.read.parquet("hdfs://someserver:9010/a-b-c-partitioned-file-1”)
> % val reloaded_df2_part =
> sqlContext.read.parquet("hdfs://someserver:9010/a-b-c-partitioned-file-2”)
> % val superfast_self_join =
> reloaded_df1_part.join(reloaded_df1_part.select($”a”,$”b”,$”c”,$”d”.as(“right_d”)),
> Seq(“a”,”b”,”c”))
> % val superfast_multi_join =
> reloaded_df1_part.join(reloaded_df2_part.select($”a”,$”b”,$”c”,$”not_in_df1”),
> Seq(“a”,”b”,”c”))
> % superfast_self_join.count
> % superfast_multi_join.count
>
> Ignoring the time necessary to repartition and assuming good
> partitioning cardinality (while joining groups of rows), are there
> performance benefits to this approach for joins ‘superfast_self_join'
> and 'superfast_multi_join'? Or is there no benefit as the partitioner
> information is lost upon persistence/write to parquet?
>
> Note I am currently using Spark 1.6.3 and moving to 2.0.1 in the near future.
>
> Thank you for any insights.
>
> Adam

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org