You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by S Malligarjunan <sm...@yahoo.com> on 2014/07/17 19:17:02 UTC

Need help on Spark UDF (Join) Performance tuning .

Hello Experts,

I am facing performance problem when I use the UDF function call. Please help me to tune the query.
Please find the details below

shark> select count(*) from table1;
OK
151096
Time taken: 7.242 seconds
shark> select count(*) from table2; 
OK
938
Time taken: 1.273 seconds

Without UDF:
shark> SELECT
     >         count(pvc1.time)
     >   FROM table2 pvc2 JOIN table1 pvc1
     >   WHERE pvc1.col1 = pvc2.col2
     >   AND  unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') > unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
OK
328
Time taken: 200.487 seconds


shark> 
     > SELECT
     >         count(pvc1.time)
     >   FROM table2 pvc2 JOIN table1 pvc1
     >   WHERE (pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2)
     >   AND  unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') > unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
OK
331
Time taken: 292.86 seconds

With UDF:
shark>  
     >  SELECT
     >         count(pvc1.time)
     >   FROM table2 pvc2 JOIN table1 pvc1
     >   WHERE testCompare(pvc1.col1,pvc1.col2, pvc2.col1,pvc2.col2)
     >   AND  unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') > unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');

OK
331
Time taken: 3718.23 seconds

The above UDF query takes more time to run. 


Where testCompare is an udf function, The function just does the pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2

Please let me know what is the issue here?

 
Thanks and Regards,
Sankar S.  

Re: Need help on Spark UDF (Join) Performance tuning .

Posted by S Malligarjunan <sm...@yahoo.com>.
Hello Michael,

Thank you very much for helping. 

I have removed the UDF function now and Just executed the sql query. But this time the data in table1 is 15140637 and table2 count is 129138.

Executed the following query  SELECT count(table1.time) FROM table2 JOIN table1 WHERE (table1.column1 = table2.column1 OR table1.column2 = pvc2.column2) AND  unix_timestamp(table2.time, 'yyyy-MM-dd HH:mm:ss,SSS') > unix_timestamp(table1.time, 'yyyy-MM-dd HH:mm:ss,SSS');

The query was being executed for more than 25 hours no output, but finally got broken pipe error :(.

What would be the problem? How do I increase the performance. Please see the explain below and let me know what can be improved.

ABSTRACT SYNTAX TREE:
  (TOK_QUERY (TOK_FROM (TOK_JOIN (TOK_TABREF (TOK_TABNAME cnv_px_jul18) table2) (TOK_TABREF (TOK_TABNAME cnv_imp_jul18) table1))) (TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) (TOK_SELECT (TOK_SELEXPR (TOK_FUNCTION count (. (TOK_TABLE_OR_COL table1) time)))) (TOK_WHERE (AND (OR (= (. (TOK_TABLE_OR_COL table1) column1) (. (TOK_TABLE_OR_COL table2) column1)) (= (. (TOK_TABLE_OR_COL table1) column2) (. (TOK_TABLE_OR_COL table2) column2))) (> (TOK_FUNCTION unix_timestamp (. (TOK_TABLE_OR_COL table2) time) 'yyyy-MM-dd HH:mm:ss,SSS') (TOK_FUNCTION unix_timestamp (. (TOK_TABLE_OR_COL table1) time) 'yyyy-MM-dd HH:mm:ss,SSS'))))))



STAGE DEPENDENCIES:
  Stage-0 is a root stage
  Stage-1 is a root stage

STAGE PLANS:
  Stage: Stage-0

  Stage: Stage-1
    Fetch Operator
      limit: -1

SHARK QUERY PLAN #0:
  **shark.execution.FileSinkOperator
  File Output Operator
    compressed: false
    GlobalTableId: 0
    table:
        input format: org.apache.hadoop.mapred.TextInputFormat
        output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
    **shark.execution.SelectOperator
    Select Operator
      expressions:
            expr: _col0
            type: bigint
      outputColumnNames: _col0
      **org.apache.hadoop.hive.ql.exec.GroupByPostShuffleOperator
      Group By Operator
        aggregations:
              expr: count(VALUE._col0)
        bucketGroup: false
        mode: mergepartial
        outputColumnNames: _col0
        **shark.execution.ReduceSinkOperator
        Reduce Output Operator
          sort order: 
          tag: -1
          value expressions:
                expr: _col0
                type: bigint
          **org.apache.hadoop.hive.ql.exec.GroupByPreShuffleOperator
          Group By Operator
            aggregations:
                  expr: count(_col10)
            bucketGroup: false
            mode: hash
            outputColumnNames: _col0
            **shark.execution.SelectOperator
            Select Operator
              expressions:
                    expr: _col10
                    type: string
              outputColumnNames: _col10
              **shark.execution.FilterOperator
              Filter Operator
                predicate:
                    expr: (((_col13 = _col6) or (_col12 = _col5)) and (unix_timestamp(_col0, 'yyyy-MM-dd HH:mm:ss,SSS') > unix_timestamp(_col10, 'yyyy-MM-dd HH:mm:ss,SSS')))
                    type: boolean
                **shark.execution.JoinOperator
                Join Operator
                  condition map:
                       Inner Join 0 to 1
                  condition expressions:
                    0 {VALUE._col0} {VALUE._col5} {VALUE._col6}
                    1 {VALUE._col0} {VALUE._col2} {VALUE._col3}
                  handleSkewJoin: false
                  outputColumnNames: _col0, _col5, _col6, _col10, _col12, _col13
                  **shark.execution.ReduceSinkOperator
                  Reduce Output Operator
                    sort order: 
                    tag: 0
                    value expressions:
                          expr: time
                          type: string
                          expr: column2
                          type: string
                          expr: column1
                          type: string
                    **shark.execution.TableScanOperator
                    TableScan
                      alias: table2
                  **shark.execution.ReduceSinkOperator
                  Reduce Output Operator
                    sort order: 
                    tag: 1
                    value expressions:
                          expr: time
                          type: string
                          expr: column2
                          type: string
                          expr: column1
                          type: string
                    **shark.execution.TableScanOperator
                    TableScan
                      alias: table1


 
Thanks and Regards,
Sankar S.  

On Saturday, 19 July 2014, 1:17, Michael Armbrust <mi...@databricks.com> wrote:
 


It's likely that since your UDF is a black box to hive's query optimizer that it must choose a less efficient join algorithm that passes all possible  matches to your function for comparison.  This will happen any time your UDF touches attributes from both sides of the join.
In general you can learn more about the chosen execution strategy by running explain.
On Jul 18, 2014 12:04 PM, "S Malligarjunan" <sm...@yahoo.com> wrote:

Hello Experts,
>
>
>Appreciate your input highly, please suggest/ give me hint, what would be the issue here?
>
>
> 
>Thanks and Regards,
>Malligarjunan S.  
>
>
>
>
>
>On Thursday, 17 July 2014, 22:47, S Malligarjunan <sm...@yahoo.com> wrote:
> 
>
>
>Hello Experts,
>
>
>I am facing performance problem when I use the UDF function call. Please help me to tune the query.
>Please find the details below
>
>
>shark> select count(*) from table1;
>OK
>151096
>Time taken: 7.242 seconds
>shark> select count(*) from table2; 
>OK
>938
>Time taken: 1.273 seconds
>
>Without UDF:
>shark> SELECT
>     >        
 count(pvc1.time)
>     >   FROM table2 pvc2 JOIN table1 pvc1
>     >   WHERE pvc1.col1 = pvc2.col2
>     >   AND  unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') > unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
>OK
>328
>Time taken: 200.487 seconds
>
>
>shark> 
>     > SELECT
>     >         count(pvc1.time)
>     >   FROM
 table2 pvc2 JOIN table1 pvc1
>     >   WHERE (pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2)
>     >   AND  unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') > unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
>OK
>331
>Time taken: 292.86 seconds
>
>With UDF:
>shark>  
>     >  SELECT
>    
 >         count(pvc1.time)
>     >   FROM table2 pvc2 JOIN table1 pvc1
>     >   WHERE testCompare(pvc1.col1, pvc1.col2, pvc2.col1,pvc2.col2)
>     >  
 AND  unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') > unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
>
>OK
>331
>Time taken: 3718.23 seconds
>
>The above UDF query takes more time to run. 
>
>
>
>Where testCompare is an udf function, The function just does the pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2
>
>
>Please let me know what is the issue here?
>
>
> 
>Thanks and Regards,
>Sankar S.  
>
>
>
>

Re: Need help on Spark UDF (Join) Performance tuning .

Posted by Michael Armbrust <mi...@databricks.com>.
It's likely that since your UDF is a black box to hive's query optimizer
that it must choose a less efficient join algorithm that passes all
possible  matches to your function for comparison.  This will happen any
time your UDF touches attributes from both sides of the join.

In general you can learn more about the chosen execution strategy by
running explain.
On Jul 18, 2014 12:04 PM, "S Malligarjunan" <sm...@yahoo.com>
wrote:

> Hello Experts,
>
> Appreciate your input highly, please suggest/ give me hint, what would be
> the issue here?
>
>
> Thanks and Regards,
> Malligarjunan S.
>
>
>
>   On Thursday, 17 July 2014, 22:47, S Malligarjunan <
> smalligarjunan@yahoo.com> wrote:
>
>
> Hello Experts,
>
> I am facing performance problem when I use the UDF function call. Please
> help me to tune the query.
> Please find the details below
>
> shark> select count(*) from table1;
> OK
> 151096
> Time taken: 7.242 seconds
> shark> select count(*) from table2;
> OK
> 938
> Time taken: 1.273 seconds
>
>
> *Without UDF:*shark> SELECT
>      >         count(pvc1.time)
>      >   FROM table2 pvc2 JOIN table1 pvc1
>      >   WHERE pvc1.col1 = pvc2.col2
>      >   AND  unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') >
> unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
> OK
> 328
> Time taken: 200.487 seconds
>
>
> shark>
>      > SELECT
>      >         count(pvc1.time)
>      >   FROM table2 pvc2 JOIN table1 pvc1
>      >   WHERE (pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2)
>      >   AND  unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') >
> unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
> OK
> 331
> Time taken: 292.86 seconds
>
> *With UDF:*
> shark>
>      >  SELECT
>      >         count(pvc1.time)
>      >   FROM table2 pvc2 JOIN table1 pvc1
>      >   WHERE testCompare(pvc1.col1, pvc1.col2, pvc2.col1,pvc2.col2)
>      >   AND  unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') >
> unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
>
> OK
> 331
> Time taken: 3718.23 seconds
> The above UDF query takes more time to run.
>
> Where testCompare is an udf function, The function just does the pvc1.col1
> = pvc2.col1 OR pvc1.col1 = pvc2.col2
>
> Please let me know what is the issue here?
>
>
> Thanks and Regards,
> Sankar S.
>
>
>
>

Re: Need help on Spark UDF (Join) Performance tuning .

Posted by S Malligarjunan <sm...@yahoo.com>.
Hello Experts,

Appreciate your input highly, please suggest/ give me hint, what would be the issue here?

 
Thanks and Regards,
Malligarjunan S.  



On Thursday, 17 July 2014, 22:47, S Malligarjunan <sm...@yahoo.com> wrote:
 


Hello Experts,

I am facing performance problem when I use the UDF function call. Please help me to tune the query.
Please find the details below

shark> select count(*) from table1;
OK
151096
Time taken: 7.242 seconds
shark> select count(*) from table2; 
OK
938
Time taken: 1.273 seconds

Without UDF:
shark> SELECT
     >        
 count(pvc1.time)
     >   FROM table2 pvc2 JOIN table1 pvc1
     >   WHERE pvc1.col1 = pvc2.col2
     >   AND  unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') > unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
OK
328
Time taken: 200.487 seconds


shark> 
     > SELECT
     >         count(pvc1.time)
     >   FROM table2 pvc2 JOIN table1 pvc1
     >   WHERE (pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2)
     >   AND  unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') > unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');
OK
331
Time taken: 292.86 seconds

With UDF:
shark>  
     >  SELECT
     >         count(pvc1.time)
     >   FROM table2 pvc2 JOIN table1 pvc1
     >   WHERE testCompare(pvc1.col1, pvc1.col2, pvc2.col1,pvc2.col2)
     >  
 AND  unix_timestamp(pvc2.time, 'yyyy-MM-dd HH:mm:ss,SSS') > unix_timestamp(pvc1.time, 'yyyy-MM-dd HH:mm:ss,SSS');

OK
331
Time taken: 3718.23 seconds

The above UDF query takes more time to run. 


Where testCompare is an udf function, The function just does the pvc1.col1 = pvc2.col1 OR pvc1.col1 = pvc2.col2

Please let me know what is the issue here?

 
Thanks and Regards,
Sankar S.