You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Jason <Ja...@jasonknight.us> on 2015/08/26 02:11:37 UTC

Persisting sorted parquet tables for future sort merge joins

I want to persist a large _sorted_ table to Parquet on S3 and then read
this in and join it using the Sorted Merge Join strategy against another
large sorted table.

The problem is: even though I sort these tables on the join key beforehand,
once I persist them to Parquet, they lose the information about their
sortedness. Is there anyway to hint to Spark that they do not need to be
resorted the next time I read them in?

I've been trying this on 1.5 and I keep getting plans looking like:
[== Physical Plan ==]
[TungstenProject [pos#28400,workf...#28399]]
[ SortMergeJoin [CHROM#28403,pos#28400], [CHROM#28399,pos#28332]]
[  TungstenSort [CHROM#28403 ASC,pos#28400 ASC], false, 0]
[   TungstenExchange hashpartitioning(CHROM#28403,pos#28400)]
[    ConvertToUnsafe]
[     Scan ParquetRelation[file:/....sorted.parquet][pos#284....8424]]
[  TungstenSort [CHROM#28399 ASC,pos#28332 ASC], false, 0]
[   TungstenExchange hashpartitioning(CHROM#28399,pos#28332)]
[    ConvertToUnsafe]
[     Scan ParquetRelation[file:....exploded_sorted.parquet][pos#2.....399]]

Thanks,
Jason

Persisting sorted parquet tables for future sort merge joins

Posted by Jason <bi...@gmail.com>.
I want to persist a large _sorted_ table to Parquet on S3 and then read
this in and join it using the Sorted Merge Join strategy against another
large sorted table.

The problem is: even though I sort these tables on the join key beforehand,
once I persist them to Parquet, they lose the information about their
sortedness. Is there anyway to hint to Spark that they do not need to be
resorted the next time I read them in?

I've been trying this on 1.5 and I keep getting plans looking like:
[== Physical Plan ==]
[TungstenProject [pos#28400,workf...#28399]]
[ SortMergeJoin [CHROM#28403,pos#28400], [CHROM#28399,pos#28332]]
[ TungstenSort [CHROM#28403 ASC,pos#28400 ASC], false, 0]
[ TungstenExchange hashpartitioning(CHROM#28403,pos#28400)]
[ ConvertToUnsafe]
[ Scan ParquetRelation[file:/....sorted.parquet][pos#284....8424]]
[ TungstenSort [CHROM#28399 ASC,pos#28332 ASC], false, 0]
[ TungstenExchange hashpartitioning(CHROM#28399,pos#28332)]
[ ConvertToUnsafe]
[ Scan ParquetRelation[file:....exploded_sorted.parquet][pos#2.....399]]

As you can see, this plan exchanges and sorts the data before performing
the SortMergeJoin even though these parquet tables are already sorted.

Thanks,
Jason