You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Rohit Verma <ro...@rokittech.com> on 2016/11/12 11:11:14 UTC
Spark joins using row id
For datasets structured as
ds1
rowN col1
1 A
2 B
3 C
4 C
…
and
ds2
rowN col2
1 X
2 Y
3 Z
…
I want to do a left join
Dataset<Row> joined = ds1.join(ds2,”rowN”,”left outer”);
I somewhere read in SO or this mailing list that if spark is aware of datasets being sorted it will use some optimizations for joins.
Is it possible to make this join more efficient/faster.
Rohit
Re: Spark joins using row id
Posted by "颜发才 (Yan Facai)" <ya...@gmail.com>.
pairRDD can use (hash) partition information to do some optimizations when
joined, while I am not sure if dataset could.
On Sat, Nov 12, 2016 at 7:11 PM, Rohit Verma <ro...@rokittech.com>
wrote:
> For datasets structured as
>
> ds1
> rowN col1
> 1 A
> 2 B
> 3 C
> 4 C
> …
>
> and
>
> ds2
> rowN col2
> 1 X
> 2 Y
> 3 Z
> …
>
> I want to do a left join
>
> Dataset<Row> joined = ds1.join(ds2,”rowN”,”left outer”);
>
> I somewhere read in SO or this mailing list that if spark is aware of
> datasets being sorted it will use some optimizations for joins.
> Is it possible to make this join more efficient/faster.
>
> Rohit
Re: Spark joins using row id
Posted by Rohit Verma <ro...@rokittech.com>.
Result of explain is as follows
*BroadcastHashJoin [rowN#0], [rowN#39], LeftOuter, BuildRight
:- *Project [rowN#0, informer_code#22]
: +- Window [rownumber() windowspecdefinition(informer_code#22 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rowN#0], [informer_code#22 ASC]
: +- *Sort [informer_code#22 ASC], false, 0
: +- Exchange SinglePartition
: +- *HashAggregate(keys=[informer_code#22], functions=[])
: +- Exchange hashpartitioning(informer_code#22, 200)
: +- *HashAggregate(keys=[informer_code#22], functions=[])
: +- *BatchedScan parquet [INFORMER_CODE#22] Format: ParquetFormat, InputPaths: hdfs://192.168.0.102:8020/user/rohit/data/5/78/ORCL.CRA.CUSTOMERS.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<INFORMER_CODE:string>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
+- *Project [rowN#39, customer_type#64]
+- Window [rownumber() windowspecdefinition(customer_type#64 ASC, ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rowN#39], [customer_type#64 ASC]
+- *Sort [customer_type#64 ASC], false, 0
+- Exchange SinglePartition
+- *HashAggregate(keys=[customer_type#64], functions=[])
+- Exchange hashpartitioning(customer_type#64, 200)
+- *HashAggregate(keys=[customer_type#64], functions=[])
+- *BatchedScan parquet [CUSTOMER_TYPE#64] Format: ParquetFormat, InputPaths: hdfs://192.168.0.102:8020/user/rohit/data/5/78/ORCL.CRA.CUSTOMERS.parquet, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<CUSTOMER_TYPE:string>
I believe this isn’t the intended behavior.
Rohit
On Nov 12, 2016, at 6:15 PM, Stuart White <st...@gmail.com>> wrote:
The Spark Catalyst Optimizer is responsible for determining what steps Spark needs to execute to satisfy your query. Given what it knows about your datasets, it attempts to choose the most optimal set of steps. On any dataset you can use the .explain() method to print out the steps that Spark will execute to satisfy your query.
This site explains how all this works:
http://blog.hydronitrogen.com/2016/05/13/shuffle-free-joins-in-spark-sql/
On Sat, Nov 12, 2016 at 5:11 AM, Rohit Verma <ro...@rokittech.com>> wrote:
For datasets structured as
ds1
rowN col1
1 A
2 B
3 C
4 C
…
and
ds2
rowN col2
1 X
2 Y
3 Z
…
I want to do a left join
Dataset<Row> joined = ds1.join(ds2,”rowN”,”left outer”);
I somewhere read in SO or this mailing list that if spark is aware of datasets being sorted it will use some optimizations for joins.
Is it possible to make this join more efficient/faster.
Rohit