You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues-all@impala.apache.org by "Xianqing He (Jira)" <ji...@apache.org> on 2021/07/12 11:41:00 UTC

[jira] [Updated] (IMPALA-10789) Early materialize expressions in ScanNode

     [ https://issues.apache.org/jira/browse/IMPALA-10789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Xianqing He updated IMPALA-10789:
---------------------------------
    Description: 
Impala uses the Late Materialize, to calculate expressions, for example,
{code:java}
SELECT SUM(col), COUNT(col), MIN(col), MAX(col)
FROM (
	SELECT CAST(regexp_extract(string_col, '(\\d+)', 0) AS bigint) col
	FROM functional_parquet.alltypesagg
) t{code}
The Plan likes
{code:java}
PLAN-ROOT SINK
|
03:AGGREGATE [FINALIZE]
|  output: sum:merge(col), count:merge(col), min:merge(col), max:merge(col)
|  row-size=32B cardinality=1
|
02:EXCHANGE [UNPARTITIONED]
|
01:AGGREGATE
|  output: sum(CAST(regexp_extract(string_col, '(\\d+)', 0) AS BIGINT)), count(CAST(regexp_extract(string_col, '(\\d+)', 0) AS BIGINT)), min(CAST(regexp_extract(string_col, '(\\d+)', 0) AS BIGINT)), max(CAST(regexp_extract(string_col, '(\\d+)', 0) AS BIGINT))
|  row-size=32B cardinality=1
|
00:SCAN HDFS [functional_parquet.alltypesagg]
   partitions=11/11 files=11 size=464.70KB
   row-size=15B cardinality=11.00K
{code}
In the aggregation phase, the expression in the parameters of the aggregation function is evaluated. In this way, the same expression that appears in multiple aggregation functions needs to be evaluated multiple times, which leads to long time consuming, especially for complex expressions, such as regular expressions.

For analytic functions and contains union all, 

 
{code:java}
SELECT SUM(int_col) OVER (PARTITION BY id )
FROM (
    SELECT id
        , CASE
            WHEN id = 10 THEN tinyint_col
            WHEN string_col LIKE '%6%' THEN smallint_col
        END AS int_col
    FROM functional_parquet.alltypesagg
    UNION ALL
    SELECT id
        , CASE
            WHEN id = 10 THEN tinyint_col
            WHEN string_col LIKE '%6%' THEN smallint_col
        END AS int_col
    FROM functional_parquet.alltypes
) t
{code}
The plan likes
{code:java}
PLAN-ROOT SINK
|
06:EXCHANGE [UNPARTITIONED]
|
04:ANALYTIC
|  functions: sum(int_col)
|  partition by: id
|  row-size=14B cardinality=18.30K
|
03:SORT
|  order by: id ASC NULLS FIRST
|  row-size=6B cardinality=18.30K
|
05:EXCHANGE [HASH(id)]
|
00:UNION
|  row-size=6B cardinality=18.30K
|
|--02:SCAN HDFS [functional_parquet.alltypes]
|     partitions=24/24 files=24 size=189.91KB
|     row-size=22B cardinality=7.30K
|
01:SCAN HDFS [functional_parquet.alltypesagg]
   partitions=11/11 files=11 size=464.70KB
   row-size=24B cardinality=11.00K{code}
In UnionNode, it will materialize expressions and prune columns.

Currently UnionNode is single-threaded and ScanNode supports multi-threading, it will  improve query performance if materialize expressions in ScanNode.

We can specify which expressions require early materialize by hints, and Impala internally determines if the expression can be evaluated in ScanNode.
{code:java}
SELECT SUM(col), COUNT(col), MIN(col), MAX(col)
FROM (
	SELECT CAST(regexp_extract(string_col, '(\\d+)', 0) AS bigint) col/*+early_materialize*/
	FROM functional_parquet.alltypesagg
) t
{code}
This can materialize in ScanNode, but like the follow can't use early matrialize
{code:java}
SELECT SUM(col)
FROM (
	SELECT CASE 
			WHEN t1.id = 10 THEN t2.tinyint_col
			ELSE t2.smallint_col
		END AS col/*+materialize_expr*/
	FROM functional_parquet.alltypesagg t1
		JOIN functional_parquet.alltypes t2 ON t1.id = t2.id
) t
{code}
 

  was:
Impala uses the Late Materialize, to calculate expressions, for example,
{code:java}
SELECT SUM(col), COUNT(col), MIN(col), MAX(col)
FROM (
	SELECT CAST(regexp_extract(string_col, '(\\d+)', 0) AS bigint) col
	FROM functional_parquet.alltypesagg
) t{code}
The Plan like
{code:java}
PLAN-ROOT SINK
|
03:AGGREGATE [FINALIZE]
|  output: sum:merge(col), count:merge(col), min:merge(col), max:merge(col)
|  row-size=32B cardinality=1
|
02:EXCHANGE [UNPARTITIONED]
|
01:AGGREGATE
|  output: sum(CAST(regexp_extract(string_col, '(\\d+)', 0) AS BIGINT)), count(CAST(regexp_extract(string_col, '(\\d+)', 0) AS BIGINT)), min(CAST(regexp_extract(string_col, '(\\d+)', 0) AS BIGINT)), max(CAST(regexp_extract(string_col, '(\\d+)', 0) AS BIGINT))
|  row-size=32B cardinality=1
|
00:SCAN HDFS [functional_parquet.alltypesagg]
   partitions=11/11 files=11 size=464.70KB
   row-size=15B cardinality=11.00K
{code}
In the aggregation phase, the expression in the parameters of the aggregation function is evaluated. In this way, the same expression that appears in multiple aggregation functions needs to be evaluated multiple times, which leads to long time consuming, especially for complex expressions, such as regular expressions.

For analytic functions and contains union all, 

 
{code:java}
SELECT SUM(int_col) OVER (PARTITION BY id )
FROM (
    SELECT id
        , CASE
            WHEN id = 10 THEN tinyint_col
            WHEN string_col LIKE '%6%' THEN smallint_col
        END AS int_col
    FROM functional_parquet.alltypesagg
    UNION ALL
    SELECT id
        , CASE
            WHEN id = 10 THEN tinyint_col
            WHEN string_col LIKE '%6%' THEN smallint_col
        END AS int_col
    FROM functional_parquet.alltypes
) t
{code}
The plan like
{code:java}
PLAN-ROOT SINK
|
06:EXCHANGE [UNPARTITIONED]
|
04:ANALYTIC
|  functions: sum(int_col)
|  partition by: id
|  row-size=14B cardinality=18.30K
|
03:SORT
|  order by: id ASC NULLS FIRST
|  row-size=6B cardinality=18.30K
|
05:EXCHANGE [HASH(id)]
|
00:UNION
|  row-size=6B cardinality=18.30K
|
|--02:SCAN HDFS [functional_parquet.alltypes]
|     partitions=24/24 files=24 size=189.91KB
|     row-size=22B cardinality=7.30K
|
01:SCAN HDFS [functional_parquet.alltypesagg]
   partitions=11/11 files=11 size=464.70KB
   row-size=24B cardinality=11.00K{code}
In UnionNode, it will materialize expressions and prune columns.

Currently UnionNode is single-threaded and ScanNode supports multi-threading, it will  improve query performance if materialize expressions in ScanNode.

We can specify which expressions require early materialize by hints, and Impala internally determines if the expression can be evaluated in ScanNode.
{code:java}
SELECT SUM(col), COUNT(col), MIN(col), MAX(col)
FROM (
	SELECT CAST(regexp_extract(string_col, '(\\d+)', 0) AS bigint) col/*+early_materialize*/
	FROM functional_parquet.alltypesagg
) t
{code}
This can materialize in ScanNode, but like the follow can't use early matrialize
{code:java}
SELECT SUM(col)
FROM (
	SELECT CASE 
			WHEN t1.id = 10 THEN t2.tinyint_col
			ELSE t2.smallint_col
		END AS col/*+materialize_expr*/
	FROM functional_parquet.alltypesagg t1
		JOIN functional_parquet.alltypes t2 ON t1.id = t2.id
) t
{code}
 


> Early materialize expressions in ScanNode
> -----------------------------------------
>
>                 Key: IMPALA-10789
>                 URL: https://issues.apache.org/jira/browse/IMPALA-10789
>             Project: IMPALA
>          Issue Type: Improvement
>          Components: Backend, Frontend
>    Affects Versions: Impala 4.1
>            Reporter: Xianqing He
>            Assignee: Xianqing He
>            Priority: Major
>             Fix For: Impala 4.1
>
>
> Impala uses the Late Materialize, to calculate expressions, for example,
> {code:java}
> SELECT SUM(col), COUNT(col), MIN(col), MAX(col)
> FROM (
> 	SELECT CAST(regexp_extract(string_col, '(\\d+)', 0) AS bigint) col
> 	FROM functional_parquet.alltypesagg
> ) t{code}
> The Plan likes
> {code:java}
> PLAN-ROOT SINK
> |
> 03:AGGREGATE [FINALIZE]
> |  output: sum:merge(col), count:merge(col), min:merge(col), max:merge(col)
> |  row-size=32B cardinality=1
> |
> 02:EXCHANGE [UNPARTITIONED]
> |
> 01:AGGREGATE
> |  output: sum(CAST(regexp_extract(string_col, '(\\d+)', 0) AS BIGINT)), count(CAST(regexp_extract(string_col, '(\\d+)', 0) AS BIGINT)), min(CAST(regexp_extract(string_col, '(\\d+)', 0) AS BIGINT)), max(CAST(regexp_extract(string_col, '(\\d+)', 0) AS BIGINT))
> |  row-size=32B cardinality=1
> |
> 00:SCAN HDFS [functional_parquet.alltypesagg]
>    partitions=11/11 files=11 size=464.70KB
>    row-size=15B cardinality=11.00K
> {code}
> In the aggregation phase, the expression in the parameters of the aggregation function is evaluated. In this way, the same expression that appears in multiple aggregation functions needs to be evaluated multiple times, which leads to long time consuming, especially for complex expressions, such as regular expressions.
> For analytic functions and contains union all, 
>  
> {code:java}
> SELECT SUM(int_col) OVER (PARTITION BY id )
> FROM (
>     SELECT id
>         , CASE
>             WHEN id = 10 THEN tinyint_col
>             WHEN string_col LIKE '%6%' THEN smallint_col
>         END AS int_col
>     FROM functional_parquet.alltypesagg
>     UNION ALL
>     SELECT id
>         , CASE
>             WHEN id = 10 THEN tinyint_col
>             WHEN string_col LIKE '%6%' THEN smallint_col
>         END AS int_col
>     FROM functional_parquet.alltypes
> ) t
> {code}
> The plan likes
> {code:java}
> PLAN-ROOT SINK
> |
> 06:EXCHANGE [UNPARTITIONED]
> |
> 04:ANALYTIC
> |  functions: sum(int_col)
> |  partition by: id
> |  row-size=14B cardinality=18.30K
> |
> 03:SORT
> |  order by: id ASC NULLS FIRST
> |  row-size=6B cardinality=18.30K
> |
> 05:EXCHANGE [HASH(id)]
> |
> 00:UNION
> |  row-size=6B cardinality=18.30K
> |
> |--02:SCAN HDFS [functional_parquet.alltypes]
> |     partitions=24/24 files=24 size=189.91KB
> |     row-size=22B cardinality=7.30K
> |
> 01:SCAN HDFS [functional_parquet.alltypesagg]
>    partitions=11/11 files=11 size=464.70KB
>    row-size=24B cardinality=11.00K{code}
> In UnionNode, it will materialize expressions and prune columns.
> Currently UnionNode is single-threaded and ScanNode supports multi-threading, it will  improve query performance if materialize expressions in ScanNode.
> We can specify which expressions require early materialize by hints, and Impala internally determines if the expression can be evaluated in ScanNode.
> {code:java}
> SELECT SUM(col), COUNT(col), MIN(col), MAX(col)
> FROM (
> 	SELECT CAST(regexp_extract(string_col, '(\\d+)', 0) AS bigint) col/*+early_materialize*/
> 	FROM functional_parquet.alltypesagg
> ) t
> {code}
> This can materialize in ScanNode, but like the follow can't use early matrialize
> {code:java}
> SELECT SUM(col)
> FROM (
> 	SELECT CASE 
> 			WHEN t1.id = 10 THEN t2.tinyint_col
> 			ELSE t2.smallint_col
> 		END AS col/*+materialize_expr*/
> 	FROM functional_parquet.alltypesagg t1
> 		JOIN functional_parquet.alltypes t2 ON t1.id = t2.id
> ) t
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-all-unsubscribe@impala.apache.org
For additional commands, e-mail: issues-all-help@impala.apache.org