You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by S Malligarjunan <sm...@yahoo.com> on 2014/07/17 19:17:02 UTC
Need help on Spark UDF (Join) Performance tuning .
Hello Experts,
I am facing performance problem when I use the UDF function call. Please help me to tune the query.
Please find the details below
shark> select count(*) from table1;
OK
151096
Time taken: 7.242 seconds
shark> select count(*) from table2;
OK
938
Time taken: 1.273 seconds
Without UDF:
shark> SELECT
> count(pvc1.time)
> FROM table2 pvc2 JOIN table1 pvc1
> WHERE pvc1.col1 = pvc2.col2
> AND unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') > unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
OK
328
Time taken: 200.487 seconds
shark>
> SELECT
> count(pvc1.time)
> FROM table2 pvc2 JOIN table1 pvc1
> WHERE (pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2)
> AND unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') > unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
OK
331
Time taken: 292.86 seconds
With UDF:
shark>
> SELECT
> count(pvc1.time)
> FROM table2 pvc2 JOIN table1 pvc1
> WHERE testCompare(pvc1.col1,pvc1.col2, pvc2.col1,pvc2.col2)
> AND unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') > unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
OK
331
Time taken: 3718.23 seconds
The above UDF query takes more time to run.
Where testCompare is an udf function, The function just does the pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2
Please let me know what is the issue here?
Thanks and Regards,
Sankar S.
Re: Need help on Spark UDF (Join) Performance tuning .
Posted by S Malligarjunan <sm...@yahoo.com>.
Hello Michael,
Thank you very much for helping.
I have removed the UDF function now and Just executed the sql query. But this time the data in table1 is 15140637 and table2 count is 129138.
Executed the following query SELECT count(table1.time) FROM table2 JOIN table1 WHERE (table1.column1 = table2.column1 OR table1.column2 = pvc2.column2) AND unix_timestamp(table2.time, 'yyyy-MM-dd HH:mm:ss,SSS') > unix_timestamp(table1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
The query was being executed for more than 25 hours no output, but finally got broken pipe error :(.
What would be the problem? How do I increase the performance. Please see the explain below and let me know what can be improved.
ABSTRACT SYNTAX TREE:
(TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME cnv_px_jul18) table2) (TOK_TABREF (TOK_TABNAME cnv_imp_jul18) table1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION count (. (TOK_TABLE_OR_COL table1) time)))) (TOK_WHERE (AND (OR (= (. (TOK_TABLE_OR_COL table1) column1) (. (TOK_TABLE_OR_COL table2) column1)) (= (. (TOK_TABLE_OR_COL table1) column2) (. (TOK_TABLE_OR_COL table2) column2))) (> (TOK_FUNCTION unix_timestamp (. (TOK_TABLE_OR_COL table2) time) 'yyyy-MM-dd HH:mm:ss,SSS') (TOK_FUNCTION unix_timestamp (. (TOK_TABLE_OR_COL table1) time) 'yyyy-MM-dd HH:mm:ss,SSS'))))))
STAGE DEPENDENCIES:
Stage-0 is a root stage
Stage-1 is a root stage
STAGE PLANS:
Stage: Stage-0
Stage: Stage-1
Fetch Operator
limit: -1
SHARK QUERY PLAN #0:
**shark.execution.FileSinkOperator
File Output Operator
compressed: false
GlobalTableId: 0
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
**shark.execution.SelectOperator
Select Operator
expressions:
expr: _col0
type: bigint
outputColumnNames: _col0
**org.apache.hadoop.hive.ql.exec.GroupByPostShuffleOperator
Group By Operator
aggregations:
expr: count(VALUE._col0)
bucketGroup: false
mode: mergepartial
outputColumnNames: _col0
**shark.execution.ReduceSinkOperator
Reduce Output Operator
sort order:
tag: -1
value expressions:
expr: _col0
type: bigint
**org.apache.hadoop.hive.ql.exec.GroupByPreShuffleOperator
Group By Operator
aggregations:
expr: count(_col10)
bucketGroup: false
mode: hash
outputColumnNames: _col0
**shark.execution.SelectOperator
Select Operator
expressions:
expr: _col10
type: string
outputColumnNames: _col10
**shark.execution.FilterOperator
Filter Operator
predicate:
expr: (((_col13 = _col6) or (_col12 = _col5)) and (unix_timestamp(_col0, 'yyyy-MM-dd HH:mm:ss,SSS') > unix_timestamp(_col10, 'yyyy-MM-dd HH:mm:ss,SSS')))
type: boolean
**shark.execution.JoinOperator
Join Operator
condition map:
Inner Join 0 to 1
condition expressions:
0 {VALUE._col0} {VALUE._col5} {VALUE._col6}
1 {VALUE._col0} {VALUE._col2} {VALUE._col3}
handleSkewJoin: false
outputColumnNames: _col0, _col5, _col6, _col10, _col12, _col13
**shark.execution.ReduceSinkOperator
Reduce Output Operator
sort order:
tag: 0
value expressions:
expr: time
type: string
expr: column2
type: string
expr: column1
type: string
**shark.execution.TableScanOperator
TableScan
alias: table2
**shark.execution.ReduceSinkOperator
Reduce Output Operator
sort order:
tag: 1
value expressions:
expr: time
type: string
expr: column2
type: string
expr: column1
type: string
**shark.execution.TableScanOperator
TableScan
alias: table1
Thanks and Regards,
Sankar S.
On Saturday, 19 July 2014, 1:17, Michael Armbrust <mi...@databricks.com> wrote:
It's likely that since your UDF is a black box to hive's query optimizer that it must choose a less efficient join algorithm that passes all possible matches to your function for comparison. This will happen any time your UDF touches attributes from both sides of the join.
In general you can learn more about the chosen execution strategy by running explain.
On Jul 18, 2014 12:04 PM, "S Malligarjunan" <sm...@yahoo.com> wrote:
Hello Experts,
>
>
>Appreciate your input highly, please suggest/ give me hint, what would be the issue here?
>
>
>
>Thanks and Regards,
>Malligarjunan S.
>
>
>
>
>
>On Thursday, 17 July 2014, 22:47, S Malligarjunan <sm...@yahoo.com> wrote:
>
>
>
>Hello Experts,
>
>
>I am facing performance problem when I use the UDF function call. Please help me to tune the query.
>Please find the details below
>
>
>shark> select count(*) from table1;
>OK
>151096
>Time taken: 7.242 seconds
>shark> select count(*) from table2;
>OK
>938
>Time taken: 1.273 seconds
>
>Without UDF:
>shark> SELECT
> >
count(pvc1.time)
> > FROM table2 pvc2 JOIN table1 pvc1
> > WHERE pvc1.col1 = pvc2.col2
> > AND unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') > unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
>OK
>328
>Time taken: 200.487 seconds
>
>
>shark>
> > SELECT
> > count(pvc1.time)
> > FROM
table2 pvc2 JOIN table1 pvc1
> > WHERE (pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2)
> > AND unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') > unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
>OK
>331
>Time taken: 292.86 seconds
>
>With UDF:
>shark>
> > SELECT
>
> count(pvc1.time)
> > FROM table2 pvc2 JOIN table1 pvc1
> > WHERE testCompare(pvc1.col1, pvc1.col2, pvc2.col1,pvc2.col2)
> >
AND unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') > unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
>
>OK
>331
>Time taken: 3718.23 seconds
>
>The above UDF query takes more time to run.
>
>
>
>Where testCompare is an udf function, The function just does the pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2
>
>
>Please let me know what is the issue here?
>
>
>
>Thanks and Regards,
>Sankar S.
>
>
>
>
Re: Need help on Spark UDF (Join) Performance tuning .
Posted by Michael Armbrust <mi...@databricks.com>.
It's likely that since your UDF is a black box to hive's query optimizer
that it must choose a less efficient join algorithm that passes all
possible matches to your function for comparison. This will happen any
time your UDF touches attributes from both sides of the join.
In general you can learn more about the chosen execution strategy by
running explain.
On Jul 18, 2014 12:04 PM, "S Malligarjunan" <sm...@yahoo.com>
wrote:
> Hello Experts,
>
> Appreciate your input highly, please suggest/ give me hint, what would be
> the issue here?
>
>
> Thanks and Regards,
> Malligarjunan S.
>
>
>
> On Thursday, 17 July 2014, 22:47, S Malligarjunan <
> smalligarjunan@yahoo.com> wrote:
>
>
> Hello Experts,
>
> I am facing performance problem when I use the UDF function call. Please
> help me to tune the query.
> Please find the details below
>
> shark> select count(*) from table1;
> OK
> 151096
> Time taken: 7.242 seconds
> shark> select count(*) from table2;
> OK
> 938
> Time taken: 1.273 seconds
>
>
> *Without UDF:*shark> SELECT
> > count(pvc1.time)
> > FROM table2 pvc2 JOIN table1 pvc1
> > WHERE pvc1.col1 = pvc2.col2
> > AND unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') >
> unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
> OK
> 328
> Time taken: 200.487 seconds
>
>
> shark>
> > SELECT
> > count(pvc1.time)
> > FROM table2 pvc2 JOIN table1 pvc1
> > WHERE (pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2)
> > AND unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') >
> unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
> OK
> 331
> Time taken: 292.86 seconds
>
> *With UDF:*
> shark>
> > SELECT
> > count(pvc1.time)
> > FROM table2 pvc2 JOIN table1 pvc1
> > WHERE testCompare(pvc1.col1, pvc1.col2, pvc2.col1,pvc2.col2)
> > AND unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') >
> unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
>
> OK
> 331
> Time taken: 3718.23 seconds
> The above UDF query takes more time to run.
>
> Where testCompare is an udf function, The function just does the pvc1.col1
> = pvc2.col1 OR pvc1.col1 = pvc2.col2
>
> Please let me know what is the issue here?
>
>
> Thanks and Regards,
> Sankar S.
>
>
>
>
Re: Need help on Spark UDF (Join) Performance tuning .
Posted by S Malligarjunan <sm...@yahoo.com>.
Hello Experts,
Appreciate your input highly, please suggest/ give me hint, what would be the issue here?
Thanks and Regards,
Malligarjunan S.
On Thursday, 17 July 2014, 22:47, S Malligarjunan <sm...@yahoo.com> wrote:
Hello Experts,
I am facing performance problem when I use the UDF function call. Please help me to tune the query.
Please find the details below
shark> select count(*) from table1;
OK
151096
Time taken: 7.242 seconds
shark> select count(*) from table2;
OK
938
Time taken: 1.273 seconds
Without UDF:
shark> SELECT
>
count(pvc1.time)
> FROM table2 pvc2 JOIN table1 pvc1
> WHERE pvc1.col1 = pvc2.col2
> AND unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') > unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
OK
328
Time taken: 200.487 seconds
shark>
> SELECT
> count(pvc1.time)
> FROM table2 pvc2 JOIN table1 pvc1
> WHERE (pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2)
> AND unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') > unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
OK
331
Time taken: 292.86 seconds
With UDF:
shark>
> SELECT
> count(pvc1.time)
> FROM table2 pvc2 JOIN table1 pvc1
> WHERE testCompare(pvc1.col1, pvc1.col2, pvc2.col1,pvc2.col2)
>
AND unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') > unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
OK
331
Time taken: 3718.23 seconds
The above UDF query takes more time to run.
Where testCompare is an udf function, The function just does the pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2
Please let me know what is the issue here?
Thanks and Regards,
Sankar S.