You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Manjunath Shetty H <ma...@live.com> on 2020/03/15 05:03:55 UTC

Optimising multiple hive table join and query in spark

Hi All,

We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We are serving a usecase on top of that by joining 4-5 tables using Hive as of now. But it is not fast as we wanted it to be, so we are thinking of using spark for this use case.

Any suggestion on this ? Is it good idea to use the Spark for this use case ? Can we get better performance by using spark ?

Any pointers would be helpful.

Notes:

  *   Data is partitioned by date (yyyyMMdd) as integer.
  *   Query will fetch data for last 7 days from some tables while joining with other tables.

Approach we thought of as now :

  *   Create dataframe for each table and partition by same column for all tables ( Lets say Country as partition column )
  *   Register all tables as temporary tables
  *   Run the sql query with joins

But the problem we are seeing with this approach is , even though we already partitioned using country it still does hashParittioning + shuffle during join. All the table join contain `Country` column with some extra column based on the table.

Is there any way to avoid these shuffles ? and improve performance ?


Thanks and regards
Manjunath

Re: Optimising multiple hive table join and query in spark

Posted by Dennis Suhari <d....@icloud.com.INVALID>.
Hi,

I am also using Spark on Hive Metastore. The performance is much more better esp. for larger datasets. I have the feeling that the performance is better if I load the data into dataframes and do a join instead of doing direct join within SparkSQL. But i can’t explain yet. 

Any body experiences in that ?

Br,

Dennis

Von meinem iPhone gesendet

> Am 15.03.2020 um 06:04 schrieb Manjunath Shetty H <ma...@live.com>:
> 
> 
> Hi All,
> 
> We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We are serving a usecase on top of that by joining 4-5 tables using Hive as of now. But it is not fast as we wanted it to be, so we are thinking of using spark for this use case.
> 
> Any suggestion on this ? Is it good idea to use the Spark for this use case ? Can we get better performance by using spark ?
> 
> Any pointers would be helpful.
> 
> Notes: 
> Data is partitioned by date (yyyyMMdd) as integer.
> Query will fetch data for last 7 days from some tables while joining with other tables.
> 
> Approach we thought of as now :
> Create dataframe for each table and partition by same column for all tables ( Lets say Country as partition column )
> Register all tables as temporary tables 
> Run the sql query with joins
> But the problem we are seeing with this approach is , even though we already partitioned using country it still does hashParittioning + shuffle during join. All the table join contain `Country` column with some extra column based on the table.
> 
> Is there any way to avoid these shuffles ? and improve performance ?
> 
> 
> Thanks and regards 
> Manjunath

Re: Optimising multiple hive table join and query in spark

Posted by Manjunath Shetty H <ma...@live.com>.
Thanks Georg,

Batch import job frequency is different than the read job. Import job will run every 15mins - 1 hour, and Read/Transform job will run once a day.

In this case i think write with sortWithinPartitions doesnt make any difference as the combined data stored in HDFS will not be sorted at the end of the day.

Does partition/sort while reading help ?. Tried this out but it still results in shuffle during join of multiple tables and generates very complex DAG

-
Manjunath
________________________________
From: Georg Heiler <ge...@gmail.com>
Sent: Monday, March 16, 2020 12:06 PM
To: Manjunath Shetty H <ma...@live.com>
Subject: Re: Optimising multiple hive table join and query in spark

Hi,

if you only have 1.6, forget bucketing. https://databricks.com/session/bucketing-in-spark-sql-2-3 that only works well with Hive from 2.3 onwards.
The other thing in your (daily?) batch job

val myData = spark.read.<<file>>(/path/to/file).transform(<<apply_my_transform>>)

Now when writing the data:
myData.write.repartition(xxx)
where xxx resembles the number of files you want to have for each period (day?). When writing ORC / Parquet make sure to have files of HDFS Block Size or more i.e. usually 128MB up to a maximum of 1G.
myData.write.repartition(xxx)).sortWithinPartitions(join_col, join_col)

apply a secondary sort to get ORC Indices.

IF the cardinality of the join_cols is high enough:
myData.write.repartition(xxx, col(join_col), col(other_join_col))).sortWithinPartitions(join_col, join_col)

Best,
Georg

Am Mo., 16. März 2020 um 04:27 Uhr schrieb Manjunath Shetty H <ma...@live.com>>:
Hi Georg,

Thanks for the suggestion. Can you please explain bit more about what you meant exactly ?

Bdw i am on Spark 1.6


-
Manjunath
________________________________
From: Georg Heiler <ge...@gmail.com>>
Sent: Monday, March 16, 2020 12:35 AM
To: Manjunath Shetty H <ma...@live.com>>
Subject: Re: Optimising multiple hive table join and query in spark

To speed things up:
- sortWithinPartitions (i.e. for each day)& potentially repartition
- pre-shuffle the data with bucketing

Am So., 15. März 2020 um 17:07 Uhr schrieb Manjunath Shetty H <ma...@live.com>>:
Only partitioned and Join keys are not sorted coz those are written incrementally with batch jobs
________________________________
From: Georg Heiler <ge...@gmail.com>>
Sent: Sunday, March 15, 2020 8:30:53 PM
To: Manjunath Shetty H <ma...@live.com>>
Cc: ayan guha <gu...@gmail.com>>; Magnus Nilsson <ma...@kth.se>>; user <us...@spark.apache.org>>
Subject: Re: Optimising multiple hive table join and query in spark

Did you only partition or also bucket by the join column? Are ORCI indices active i.e. the JOIN keys sorted when writing the files?

Best,
Georg

Am So., 15. März 2020 um 15:52 Uhr schrieb Manjunath Shetty H <ma...@live.com>>:
Mostly the concern is the reshuffle. Even though all the DF's are partitioned by same column. During join it does reshuffle, that is the bottleneck as of now in our POC implementation.

Is there any way tell spark that keep all partitions with same partition key at the same place so that during the join it wont do shuffle again.


-
Manjunath
________________________________
From: ayan guha <gu...@gmail.com>>
Sent: Sunday, March 15, 2020 5:46 PM
To: Magnus Nilsson <ma...@kth.se>>
Cc: user <us...@spark.apache.org>>
Subject: Re: Optimising multiple hive table join and query in spark

Hi

I would first and foremost try to identify where is the most time spend during the query. One possibility is it just takes ramp up time for executors to be available, if thats the case then maybe a dedicated yarn queue may help, or using Spark thriftserver may help.

On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson <ma...@kth.se>> wrote:
Been a while but I remember reading on Stack Overflow you can use a UDF as a join condition to trick catalyst into not reshuffling the partitions, ie use regular equality on the column you partitioned or bucketed by and your custom comparer for the other columns. Never got around to try it out hough. I really would like a native way to tell catalyst not to reshuffle just because you use more columns in the join condition.

On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H <ma...@live.com>> wrote:
Hi All,

We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We are serving a usecase on top of that by joining 4-5 tables using Hive as of now. But it is not fast as we wanted it to be, so we are thinking of using spark for this use case.

Any suggestion on this ? Is it good idea to use the Spark for this use case ? Can we get better performance by using spark ?

Any pointers would be helpful.

Notes:

  *   Data is partitioned by date (yyyyMMdd) as integer.
  *   Query will fetch data for last 7 days from some tables while joining with other tables.

Approach we thought of as now :

  *   Create dataframe for each table and partition by same column for all tables ( Lets say Country as partition column )
  *   Register all tables as temporary tables
  *   Run the sql query with joins

But the problem we are seeing with this approach is , even though we already partitioned using country it still does hashParittioning + shuffle during join. All the table join contain `Country` column with some extra column based on the table.

Is there any way to avoid these shuffles ? and improve performance ?


Thanks and regards
Manjunath


--
Best Regards,
Ayan Guha

Re: Optimising multiple hive table join and query in spark

Posted by Manjunath Shetty H <ma...@live.com>.
Only partitioned and Join keys are not sorted coz those are written incrementally with batch jobs
________________________________
From: Georg Heiler <ge...@gmail.com>
Sent: Sunday, March 15, 2020 8:30:53 PM
To: Manjunath Shetty H <ma...@live.com>
Cc: ayan guha <gu...@gmail.com>; Magnus Nilsson <ma...@kth.se>; user <us...@spark.apache.org>
Subject: Re: Optimising multiple hive table join and query in spark

Did you only partition or also bucket by the join column? Are ORCI indices active i.e. the JOIN keys sorted when writing the files?

Best,
Georg

Am So., 15. März 2020 um 15:52 Uhr schrieb Manjunath Shetty H <ma...@live.com>>:
Mostly the concern is the reshuffle. Even though all the DF's are partitioned by same column. During join it does reshuffle, that is the bottleneck as of now in our POC implementation.

Is there any way tell spark that keep all partitions with same partition key at the same place so that during the join it wont do shuffle again.


-
Manjunath
________________________________
From: ayan guha <gu...@gmail.com>>
Sent: Sunday, March 15, 2020 5:46 PM
To: Magnus Nilsson <ma...@kth.se>>
Cc: user <us...@spark.apache.org>>
Subject: Re: Optimising multiple hive table join and query in spark

Hi

I would first and foremost try to identify where is the most time spend during the query. One possibility is it just takes ramp up time for executors to be available, if thats the case then maybe a dedicated yarn queue may help, or using Spark thriftserver may help.

On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson <ma...@kth.se>> wrote:
Been a while but I remember reading on Stack Overflow you can use a UDF as a join condition to trick catalyst into not reshuffling the partitions, ie use regular equality on the column you partitioned or bucketed by and your custom comparer for the other columns. Never got around to try it out hough. I really would like a native way to tell catalyst not to reshuffle just because you use more columns in the join condition.

On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H <ma...@live.com>> wrote:
Hi All,

We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We are serving a usecase on top of that by joining 4-5 tables using Hive as of now. But it is not fast as we wanted it to be, so we are thinking of using spark for this use case.

Any suggestion on this ? Is it good idea to use the Spark for this use case ? Can we get better performance by using spark ?

Any pointers would be helpful.

Notes:

  *   Data is partitioned by date (yyyyMMdd) as integer.
  *   Query will fetch data for last 7 days from some tables while joining with other tables.

Approach we thought of as now :

  *   Create dataframe for each table and partition by same column for all tables ( Lets say Country as partition column )
  *   Register all tables as temporary tables
  *   Run the sql query with joins

But the problem we are seeing with this approach is , even though we already partitioned using country it still does hashParittioning + shuffle during join. All the table join contain `Country` column with some extra column based on the table.

Is there any way to avoid these shuffles ? and improve performance ?


Thanks and regards
Manjunath


--
Best Regards,
Ayan Guha

Re: Optimising multiple hive table join and query in spark

Posted by Georg Heiler <ge...@gmail.com>.
Did you only partition or also bucket by the join column? Are ORCI indices
active i.e. the JOIN keys sorted when writing the files?

Best,
Georg

Am So., 15. März 2020 um 15:52 Uhr schrieb Manjunath Shetty H <
manjunathshetty@live.com>:

> Mostly the concern is the reshuffle. Even though all the DF's are
> partitioned by same column. During join it does reshuffle, that is the
> bottleneck as of now in our POC implementation.
>
> Is there any way tell spark that keep all partitions with same partition
> key at the same place so that during the join it wont do shuffle again.
>
>
> -
> Manjunath
> ------------------------------
> *From:* ayan guha <gu...@gmail.com>
> *Sent:* Sunday, March 15, 2020 5:46 PM
> *To:* Magnus Nilsson <ma...@kth.se>
> *Cc:* user <us...@spark.apache.org>
> *Subject:* Re: Optimising multiple hive table join and query in spark
>
> Hi
>
> I would first and foremost try to identify where is the most time spend
> during the query. One possibility is it just takes ramp up time for
> executors to be available, if thats the case then maybe a dedicated yarn
> queue may help, or using Spark thriftserver may help.
>
> On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson <ma...@kth.se> wrote:
>
> Been a while but I remember reading on Stack Overflow you can use a UDF as
> a join condition to trick catalyst into not reshuffling the partitions, ie
> use regular equality on the column you partitioned or bucketed by and your
> custom comparer for the other columns. Never got around to try it out
> hough. I really would like a native way to tell catalyst not to reshuffle
> just because you use more columns in the join condition.
>
> On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H <
> manjunathshetty@live.com> wrote:
>
> Hi All,
>
> We have 10 tables in data warehouse (hdfs/hive) written using ORC format.
> We are serving a usecase on top of that by joining 4-5 tables using Hive as
> of now. But it is not fast as we wanted it to be, so we are thinking of
> using spark for this use case.
>
> Any suggestion on this ? Is it good idea to use the Spark for this use
> case ? Can we get better performance by using spark ?
>
> Any pointers would be helpful.
>
> *Notes*:
>
>    - Data is partitioned by date (yyyyMMdd) as integer.
>    - Query will fetch data for last 7 days from some tables while joining
>    with other tables.
>
>
> *Approach we thought of as now :*
>
>    - Create dataframe for each table and partition by same column for all
>    tables ( Lets say Country as partition column )
>    - Register all tables as temporary tables
>    - Run the sql query with joins
>
> But the problem we are seeing with this approach is , even though we
> already partitioned using country it still does hashParittioning +
> shuffle during join. All the table join contain `Country` column with some
> extra column based on the table.
>
> Is there any way to avoid these shuffles ? and improve performance ?
>
>
> Thanks and regards
> Manjunath
>
>
>
> --
> Best Regards,
> Ayan Guha
>

Re: Optimising multiple hive table join and query in spark

Posted by Manjunath Shetty H <ma...@live.com>.
Mostly the concern is the reshuffle. Even though all the DF's are partitioned by same column. During join it does reshuffle, that is the bottleneck as of now in our POC implementation.

Is there any way tell spark that keep all partitions with same partition key at the same place so that during the join it wont do shuffle again.


-
Manjunath
________________________________
From: ayan guha <gu...@gmail.com>
Sent: Sunday, March 15, 2020 5:46 PM
To: Magnus Nilsson <ma...@kth.se>
Cc: user <us...@spark.apache.org>
Subject: Re: Optimising multiple hive table join and query in spark

Hi

I would first and foremost try to identify where is the most time spend during the query. One possibility is it just takes ramp up time for executors to be available, if thats the case then maybe a dedicated yarn queue may help, or using Spark thriftserver may help.

On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson <ma...@kth.se>> wrote:
Been a while but I remember reading on Stack Overflow you can use a UDF as a join condition to trick catalyst into not reshuffling the partitions, ie use regular equality on the column you partitioned or bucketed by and your custom comparer for the other columns. Never got around to try it out hough. I really would like a native way to tell catalyst not to reshuffle just because you use more columns in the join condition.

On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H <ma...@live.com>> wrote:
Hi All,

We have 10 tables in data warehouse (hdfs/hive) written using ORC format. We are serving a usecase on top of that by joining 4-5 tables using Hive as of now. But it is not fast as we wanted it to be, so we are thinking of using spark for this use case.

Any suggestion on this ? Is it good idea to use the Spark for this use case ? Can we get better performance by using spark ?

Any pointers would be helpful.

Notes:

  *   Data is partitioned by date (yyyyMMdd) as integer.
  *   Query will fetch data for last 7 days from some tables while joining with other tables.

Approach we thought of as now :

  *   Create dataframe for each table and partition by same column for all tables ( Lets say Country as partition column )
  *   Register all tables as temporary tables
  *   Run the sql query with joins

But the problem we are seeing with this approach is , even though we already partitioned using country it still does hashParittioning + shuffle during join. All the table join contain `Country` column with some extra column based on the table.

Is there any way to avoid these shuffles ? and improve performance ?


Thanks and regards
Manjunath


--
Best Regards,
Ayan Guha

Re: Optimising multiple hive table join and query in spark

Posted by ayan guha <gu...@gmail.com>.
Hi

I would first and foremost try to identify where is the most time spend
during the query. One possibility is it just takes ramp up time for
executors to be available, if thats the case then maybe a dedicated yarn
queue may help, or using Spark thriftserver may help.

On Sun, Mar 15, 2020 at 11:02 PM Magnus Nilsson <ma...@kth.se> wrote:

> Been a while but I remember reading on Stack Overflow you can use a UDF as
> a join condition to trick catalyst into not reshuffling the partitions, ie
> use regular equality on the column you partitioned or bucketed by and your
> custom comparer for the other columns. Never got around to try it out
> hough. I really would like a native way to tell catalyst not to reshuffle
> just because you use more columns in the join condition.
>
> On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H <
> manjunathshetty@live.com> wrote:
>
>> Hi All,
>>
>> We have 10 tables in data warehouse (hdfs/hive) written using ORC format.
>> We are serving a usecase on top of that by joining 4-5 tables using Hive as
>> of now. But it is not fast as we wanted it to be, so we are thinking of
>> using spark for this use case.
>>
>> Any suggestion on this ? Is it good idea to use the Spark for this use
>> case ? Can we get better performance by using spark ?
>>
>> Any pointers would be helpful.
>>
>> *Notes*:
>>
>>    - Data is partitioned by date (yyyyMMdd) as integer.
>>    - Query will fetch data for last 7 days from some tables while
>>    joining with other tables.
>>
>>
>> *Approach we thought of as now :*
>>
>>    - Create dataframe for each table and partition by same column for
>>    all tables ( Lets say Country as partition column )
>>    - Register all tables as temporary tables
>>    - Run the sql query with joins
>>
>> But the problem we are seeing with this approach is , even though we
>> already partitioned using country it still does hashParittioning +
>> shuffle during join. All the table join contain `Country` column with some
>> extra column based on the table.
>>
>> Is there any way to avoid these shuffles ? and improve performance ?
>>
>>
>> Thanks and regards
>> Manjunath
>>
>

-- 
Best Regards,
Ayan Guha

Re: Optimising multiple hive table join and query in spark

Posted by Magnus Nilsson <ma...@kth.se>.
Been a while but I remember reading on Stack Overflow you can use a UDF as
a join condition to trick catalyst into not reshuffling the partitions, ie
use regular equality on the column you partitioned or bucketed by and your
custom comparer for the other columns. Never got around to try it out
hough. I really would like a native way to tell catalyst not to reshuffle
just because you use more columns in the join condition.

On Sun, Mar 15, 2020 at 6:04 AM Manjunath Shetty H <ma...@live.com>
wrote:

> Hi All,
>
> We have 10 tables in data warehouse (hdfs/hive) written using ORC format.
> We are serving a usecase on top of that by joining 4-5 tables using Hive as
> of now. But it is not fast as we wanted it to be, so we are thinking of
> using spark for this use case.
>
> Any suggestion on this ? Is it good idea to use the Spark for this use
> case ? Can we get better performance by using spark ?
>
> Any pointers would be helpful.
>
> *Notes*:
>
>    - Data is partitioned by date (yyyyMMdd) as integer.
>    - Query will fetch data for last 7 days from some tables while joining
>    with other tables.
>
>
> *Approach we thought of as now :*
>
>    - Create dataframe for each table and partition by same column for all
>    tables ( Lets say Country as partition column )
>    - Register all tables as temporary tables
>    - Run the sql query with joins
>
> But the problem we are seeing with this approach is , even though we
> already partitioned using country it still does hashParittioning +
> shuffle during join. All the table join contain `Country` column with some
> extra column based on the table.
>
> Is there any way to avoid these shuffles ? and improve performance ?
>
>
> Thanks and regards
> Manjunath
>