You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hive.apache.org by Mich Talebzadeh <mi...@gmail.com> on 2016/06/27 09:19:48 UTC
Querying Hive tables from Spark
Hi,
I have done some extensive tests with Spark querying Hive tables.
It appears to me that Spark does not rely on statistics that are collected
by Hive on say ORC tables. It seems that Spark uses its own optimization to
query the Hive tables irrespective of Hive has collected by way of
statistics etc?
Case in point I have a FACT table bucketed on 5 dimensional foreign keys
like below
CREATE TABLE IF NOT EXISTS oraclehadoop.sales2
(
PROD_ID bigint ,
CUST_ID bigint ,
TIME_ID timestamp ,
CHANNEL_ID bigint ,
PROMO_ID bigint ,
QUANTITY_SOLD decimal(10) ,
AMOUNT_SOLD decimal(10)
)
CLUSTERED BY (PROD_ID,CUST_ID,TIME_ID,CHANNEL_ID,PROMO_ID) INTO 256 BUCKETS
STORED AS ORC
TBLPROPERTIES ( "orc.compress"="SNAPPY",
"orc.create.index"="true",
"orc.bloom.filter.columns"="PROD_ID,CUST_ID,TIME_ID,CHANNEL_ID,PROMO_ID",
"orc.bloom.filter.fpp"="0.05",
"orc.stripe.size"="268435456",
"orc.row.index.stride"="10000")
Table is sorted in the order of prod_id, cust_id,time_id, channel_id and
promo_id. It has 22 million rows.
A simple query like below:
val s = HiveContext.table("sales2")
s.filter($"prod_id" ===13 && $"cust_id" === 50833 && $"time_id" ===
"2000-12-26 00:00:00" && $"channel_id" === 2 && $"promo_id" === 999
).explain
s.filter($"prod_id" ===13 && $"cust_id" === 50833 && $"time_id" ===
"2000-12-26 00:00:00" && $"channel_id" === 2 && $"promo_id" === 999
).collect.foreach(println)
Shows the plan as
== Physical Plan ==
Filter (((((prod_id#10L = 13) && (cust_id#11L = 50833)) && (time_id#12 =
977788800000000)) && (channel_id#13L = 2)) && (promo_id#14L = 999))
+- HiveTableScan
[prod_id#10L,cust_id#11L,time_id#12,channel_id#13L,promo_id#14L,quantity_sold#15,amount_sold#16],
MetastoreRelation oraclehadoop, sales2, None
*Spark returns 24 rows pretty fast in 22 seconds.*
Running the same on Hive with Spark as execution engine shows:
STAGE DEPENDENCIES:
Stage-0 is a root stage
STAGE PLANS:
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
TableScan
alias: sales2
Filter Operator
predicate: (((((prod_id = 13) and (cust_id = 50833)) and
(UDFToString(time_id) = '2000-12-26 00:00:00')) and (channel_id = 2)) and
(promo_id = 999)) (type: boolean)
Select Operator
expressions: 13 (type: bigint), 50833 (type: bigint),
2000-12-26 00:00:00.0 (type: timestamp), 2 (type: bigint), 999 (type:
bigint), quantity_sold (type: decimal(10,0)), amount_sold (type:
decimal(10,0))
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5,
_col6
ListSink
*And Hive on Spark returns the same 24 rows in 30 seconds*
Ok Hive query is just slower with Spark engine.
Assuming that the time taken will be optimization time + query time then it
appears that in most cases the optimization time does not really make that
impact on the overall performance?
Let me know your thoughts.
HTH
Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
RE: Querying Hive tables from Spark
Posted by "Markovitz, Dudu" <dm...@paypal.com>.
Hi Mich
I could not figure out what is the point you are trying to make.
Could you please clarify?
Thanks
Dudu
From: Mich Talebzadeh [mailto:mich.talebzadeh@gmail.com]
Sent: Monday, June 27, 2016 12:20 PM
To: user @spark <us...@spark.apache.org>; user <us...@hive.apache.org>
Subject: Querying Hive tables from Spark
Hi,
I have done some extensive tests with Spark querying Hive tables.
It appears to me that Spark does not rely on statistics that are collected by Hive on say ORC tables. It seems that Spark uses its own optimization to query the Hive tables irrespective of Hive has collected by way of statistics etc?
Case in point I have a FACT table bucketed on 5 dimensional foreign keys like below
CREATE TABLE IF NOT EXISTS oraclehadoop.sales2
(
PROD_ID bigint ,
CUST_ID bigint ,
TIME_ID timestamp ,
CHANNEL_ID bigint ,
PROMO_ID bigint ,
QUANTITY_SOLD decimal(10) ,
AMOUNT_SOLD decimal(10)
)
CLUSTERED BY (PROD_ID,CUST_ID,TIME_ID,CHANNEL_ID,PROMO_ID) INTO 256 BUCKETS
STORED AS ORC
TBLPROPERTIES ( "orc.compress"="SNAPPY",
"orc.create.index"="true",
"orc.bloom.filter.columns"="PROD_ID,CUST_ID,TIME_ID,CHANNEL_ID,PROMO_ID",
"orc.bloom.filter.fpp"="0.05",
"orc.stripe.size"="268435456",
"orc.row.index.stride"="10000")
Table is sorted in the order of prod_id, cust_id,time_id, channel_id and promo_id. It has 22 million rows.
A simple query like below:
val s = HiveContext.table("sales2")
s.filter($"prod_id" ===13 && $"cust_id" === 50833 && $"time_id" === "2000-12-26 00:00:00" && $"channel_id" === 2 && $"promo_id" === 999 ).explain
s.filter($"prod_id" ===13 && $"cust_id" === 50833 && $"time_id" === "2000-12-26 00:00:00" && $"channel_id" === 2 && $"promo_id" === 999 ).collect.foreach(println)
Shows the plan as
== Physical Plan ==
Filter (((((prod_id#10L = 13) && (cust_id#11L = 50833)) && (time_id#12 = 977788800000000)) && (channel_id#13L = 2)) && (promo_id#14L = 999))
+- HiveTableScan [prod_id#10L,cust_id#11L,time_id#12,channel_id#13L,promo_id#14L,quantity_sold#15,amount_sold#16], MetastoreRelation oraclehadoop, sales2, None
Spark returns 24 rows pretty fast in 22 seconds.
Running the same on Hive with Spark as execution engine shows:
STAGE DEPENDENCIES:
Stage-0 is a root stage
STAGE PLANS:
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
TableScan
alias: sales2
Filter Operator
predicate: (((((prod_id = 13) and (cust_id = 50833)) and (UDFToString(time_id) = '2000-12-26 00:00:00')) and (channel_id = 2)) and (promo_id = 999)) (type: boolean)
Select Operator
expressions: 13 (type: bigint), 50833 (type: bigint), 2000-12-26 00:00:00.0 (type: timestamp), 2 (type: bigint), 999 (type: bigint), quantity_sold (type: decimal(10,0)), amount_sold (type: decimal(10,0))
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
ListSink
And Hive on Spark returns the same 24 rows in 30 seconds
Ok Hive query is just slower with Spark engine.
Assuming that the time taken will be optimization time + query time then it appears that in most cases the optimization time does not really make that impact on the overall performance?
Let me know your thoughts.
HTH
Dr Mich Talebzadeh
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
Re: Querying Hive tables from Spark
Posted by Mich Talebzadeh <mi...@gmail.com>.
Thanks Gopal.
I added a compact index to this table as below on 5 columns
hive> show formatted indexes on sales2;
OK
idx_name tab_name col_names
idx_tab_name idx_type comment
sales2_idx sales2 prod_id, cust_id, time_id,
channel_id, promo_id oraclehadoop__sales2_sales2_idx__ compact
But as I expected it, CBO ignores it
STAGE PLANS:
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
TableScan
alias: sales2
Statistics: Num rows: 22052232 Data size: 6527460672 Basic stats:
COMPLETE Column stats: NONE
Filter Operator
predicate: (((((prod_id = 13) and (cust_id = 50833)) and
(UDFToString(time_id) = '2000-12-26 00:00:00')) and (channel_id = 2)) and
(promo_id = 999)) (type: boolean)
Statistics: Num rows: 689132 Data size: 203983072 Basic stats:
COMPLETE Column stats: NONE
Select Operator
expressions: 13 (type: bigint), 50833 (type: bigint),
2000-12-26 00:00:00.0 (type: timestamp), 2 (type: bigint), 999 (type:
bigint), quantity_sold (type: decimal(10,0)), amount_sold (type:
decimal(10,0))
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5,
_col6
Statistics: Num rows: 689132 Data size: 203983072 Basic
stats: COMPLETE Column stats: NONE
ListSink
thanks
Dr Mich Talebzadeh
LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
http://talebzadehmich.wordpress.com
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On 27 June 2016 at 17:38, Gopal Vijayaraghavan <go...@apache.org> wrote:
> > It appears to me that Spark does not rely on statistics that are
> >collected by Hive on say ORC tables.
> > It seems that Spark uses its own optimization to query the Hive tables
> >irrespective of Hive has collected by way of statistics etc?
>
> Spark does not have a cost based optimizer yet - please follow this JIRA,
> which suggests that it is planned for the future.
>
> <https://issues.apache.org/jira/browse/SPARK-16026>
>
>
> > CLUSTERED BY (PROD_ID,CUST_ID,TIME_ID,CHANNEL_ID,PROMO_ID) INTO 256
> >BUCKETS
> ...
> > Table is sorted in the order of prod_id, cust_id,time_id, channel_id and
> >promo_id. It has 22 million rows.
>
> Not it is not.
>
> Due to whatever backwards compatbilitiy brain-damage of Hive-1, CLUSTERED
> BY *DOES* not CLUSTER at all.
>
> Add at least
>
> SORTED BY (PROD_ID)
>
> if what you care about is scanning performance with the ORC indexes.
>
>
> > And Hive on Spark returns the same 24 rows in 30 seconds
>
> That sounds slow for 22 million rows. That should be a 5-6 second query in
> Hive on a single 16-core box.
>
> Is this a build from source? Has the build got log4j1.x with INFO/DEBUG?
>
> > Assuming that the time taken will be optimization time + query time then
> >it appears that in most cases the optimization time does not really make
> >that impact on the overall performance?
>
> The optimizer's impact is most felt when you have 3+ joins - computing
> join order, filter transitivity etc.
>
> In this case, all the optimizer does is simplify predicates.
>
> Cheers,
> Gopal
>
>
>
Re: Querying Hive tables from Spark
Posted by Gopal Vijayaraghavan <go...@apache.org>.
> It appears to me that Spark does not rely on statistics that are
>collected by Hive on say ORC tables.
> It seems that Spark uses its own optimization to query the Hive tables
>irrespective of Hive has collected by way of statistics etc?
Spark does not have a cost based optimizer yet - please follow this JIRA,
which suggests that it is planned for the future.
<https://issues.apache.org/jira/browse/SPARK-16026>
> CLUSTERED BY (PROD_ID,CUST_ID,TIME_ID,CHANNEL_ID,PROMO_ID) INTO 256
>BUCKETS
...
> Table is sorted in the order of prod_id, cust_id,time_id, channel_id and
>promo_id. It has 22 million rows.
Not it is not.
Due to whatever backwards compatbilitiy brain-damage of Hive-1, CLUSTERED
BY *DOES* not CLUSTER at all.
Add at least
SORTED BY (PROD_ID)
if what you care about is scanning performance with the ORC indexes.
> And Hive on Spark returns the same 24 rows in 30 seconds
That sounds slow for 22 million rows. That should be a 5-6 second query in
Hive on a single 16-core box.
Is this a build from source? Has the build got log4j1.x with INFO/DEBUG?
> Assuming that the time taken will be optimization time + query time then
>it appears that in most cases the optimization time does not really make
>that impact on the overall performance?
The optimizer's impact is most felt when you have 3+ joins - computing
join order, filter transitivity etc.
In this case, all the optimizer does is simplify predicates.
Cheers,
Gopal