You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@doris.apache.org by GuoLei Yi <yi...@gmail.com> on 2022/04/26 03:03:33 UTC

[Discuess][DSIP] Using global dict to accelerate query that have low cardinality string

*Issue Description:*

In Doris, many dimension columns are string type with low cardinality
(cardinality < 1024), such as some status codes, provinces, regions,
genders, etc. During group by or join operation, it has to make a hash
table in the form of strings. The efficiency is relatively low. Through
performance testing, it is found that the performance is about 3 times
worse than using int type. If we can convert string to int through a global
dictionary during a query, then the query efficiency will be greatly
improved. The solution is described below.

*Design:*

Maintain global dictionary for every column in FE as needed, for example it
could be organized as map<(table_id,col_id), dict> in DictManager.
When creating a table, set the attribute of the field to low_cardinality to
identify whether the field is a low-cardinality field. For low-cardinality
fields, use a global dictionary to speed up the corresponding query. The
specific syntax is as follows:

// create table
create table table_name (
    col1 int,
    col2 string|varchar(10) low_cardinality
);

// alter table
alter table table_name modify column col2 set low_cardinality=true

At the beginning, the dict is empty, so it will not be used during query.
FE will start a background task to obtain global dictionary information
from BE for every low cardinality column, and the task is sent to BE in the
form of SQL, as follows:

select dict(col) from table[meta]

So that there will be a new aggregation method dict and new scannode in BE.
In the new scannode, BE reads the local dictionary of each field and get
all dictinct strings. dict is an aggregation function that deduplicates and
merges the dictionaries of multiple BEs and tablets. The final result is
saved in FE's DictManager.

During query, FE puts the global dictionary information into the Fragment
structure and sends it to each BE. The structure is as follows:

struct TStringColumnDict {
list<int16> id,    // the offset or id of the string
list<string> value    // the string value
}
struct TColumnDict {
TStringColumnDict xxx,
TDateColumnDict xxx,
....
}
struct TGlobalDict {
map<id, TColumnDict> columnDicts // map from column id to column dict
}
struct TExecPlanFragmentParams {
...
TGlobalDict globalDict
}

The OlapScanNode will encodes the string data according to the value and ID
of each string in the dict, and the subsequent calculations (such as group
by) are operated according to int, and at the final result sink node, the
result is decode into the corresponding string.


*Maintain GlobalDict Changes*

During process of loading, user will import new strings that are not in the
GlobalDict, so this requires an update mechanism for the global dictionary.
Stream load is actually a query, so it will also generate a fragment, and
according to the task description of our query, there is a global dict in
the fragment. In TabletSink, each value of the column of low cardinality
should be evaluated with ColumnDict, if find a new string, the coordinator
need to inform FE that the dict of this column is invalid. Specifically, we
need to make some modifications in LoadTxnCommitRequest and add a field to
indicate which columns are invalid.

struct TLoadTxnCommitRequest {
...
list<int> invalid_col_ids
}

Invalid col ids should be persisted to the transaction state. Once the
transaction is published, we will delete the dict from the dict manager. In
this way, the query will not use this dictionary optimization in the
future. At the same time, FE generates a task to get the latest Dict. When
the latest Dict is obtained, the query can be optimized using Dict again.





--
Best Regards

Tel:134-3991-0228
Email:yiguolei@gmail.com

Re: [Discuess][DSIP] Using global dict to accelerate query that have low cardinality string

Posted by Gabriel Lee <ga...@gmail.com>.
Got it, thanks! Look forward to this new feature!

Best,
Gabriel

On Tue, 26 Apr 2022 at 17:13, GuoLei Yi <yi...@gmail.com> wrote:

> @ Gabriel Lee
> >
> > 1. I think we cannot trust users completely, so we should add a
> protection
> > mechanism for global dictionary. For example, fallback to raw string
> > processing when entries in dictionary are too many.
>
>
> Yes, you are right, we cannot trust users completely. If there are too many
> strings (for example 10000 > 1024), global dict will be invalid and not
> updated any more. All queries on that table will fallback to raw string
> processing.
>
> 2. Should we consider local dictionary instead of global dictionary? In my
> > mind, global dictionary is hard to maintain in an efficient way. For
> > example,
> >
>
> We need global dict because the encoded string will be shuffled across
> among BEs. If use a local dict then raw strings on different BE will have
> different and we will also depend on global dict to optimize join operation
> and the join operation is on different columns and should depend on global
> dict.
>
> 3. Seems like that aggregate function dict is same as distinct, is it
> right?
> >
>
> Yeah.... Currently, it is a little uglily. We use a special function name
> to indicate that the scan node should read the meta column of the
> rowset.....
>
> Gabriel Lee <ga...@gmail.com> 于2022年4月26日周二 14:54写道:
>
> > Hi Guolei,
> > I have some questions for this.
> > 1. I think we cannot trust users completely, so we should add a
> protection
> > mechanism for global dictionary. For example, fallback to raw string
> > processing when entries in dictionary are too many.
> > 2. Should we consider local dictionary instead of global dictionary? In
> my
> > mind, global dictionary is hard to maintain in an efficient way. For
> > example,
> >     2.1 we should add more synchronization mechanism for this global
> > dictionary to ensure consistency which drastically increase complexity
> >     2.2 an background worker incurs additional workloads for both FE and
> BE
> > 3. Seems like that aggregate function dict is same as distinct, is it
> > right?
> >
> > On Tue, 26 Apr 2022 at 11:04, GuoLei Yi <yi...@gmail.com> wrote:
> >
> > > *Issue Description:*
> > >
> > > In Doris, many dimension columns are string type with low cardinality
> > > (cardinality < 1024), such as some status codes, provinces, regions,
> > > genders, etc. During group by or join operation, it has to make a hash
> > > table in the form of strings. The efficiency is relatively low. Through
> > > performance testing, it is found that the performance is about 3 times
> > > worse than using int type. If we can convert string to int through a
> > global
> > > dictionary during a query, then the query efficiency will be greatly
> > > improved. The solution is described below.
> > >
> > > *Design:*
> > >
> > > Maintain global dictionary for every column in FE as needed, for
> example
> > it
> > > could be organized as map<(table_id,col_id), dict> in DictManager.
> > > When creating a table, set the attribute of the field to
> low_cardinality
> > to
> > > identify whether the field is a low-cardinality field. For
> > low-cardinality
> > > fields, use a global dictionary to speed up the corresponding query.
> The
> > > specific syntax is as follows:
> > >
> > > // create table
> > > create table table_name (
> > >     col1 int,
> > >     col2 string|varchar(10) low_cardinality
> > > );
> > >
> > > // alter table
> > > alter table table_name modify column col2 set low_cardinality=true
> > >
> > > At the beginning, the dict is empty, so it will not be used during
> query.
> > > FE will start a background task to obtain global dictionary information
> > > from BE for every low cardinality column, and the task is sent to BE in
> > the
> > > form of SQL, as follows:
> > >
> > > select dict(col) from table[meta]
> > >
> > > So that there will be a new aggregation method dict and new scannode in
> > BE.
> > > In the new scannode, BE reads the local dictionary of each field and
> get
> > > all dictinct strings. dict is an aggregation function that deduplicates
> > and
> > > merges the dictionaries of multiple BEs and tablets. The final result
> is
> > > saved in FE's DictManager.
> > >
> > > During query, FE puts the global dictionary information into the
> Fragment
> > > structure and sends it to each BE. The structure is as follows:
> > >
> > > struct TStringColumnDict {
> > > list<int16> id,    // the offset or id of the string
> > > list<string> value    // the string value
> > > }
> > > struct TColumnDict {
> > > TStringColumnDict xxx,
> > > TDateColumnDict xxx,
> > > ....
> > > }
> > > struct TGlobalDict {
> > > map<id, TColumnDict> columnDicts // map from column id to column dict
> > > }
> > > struct TExecPlanFragmentParams {
> > > ...
> > > TGlobalDict globalDict
> > > }
> > >
> > > The OlapScanNode will encodes the string data according to the value
> and
> > ID
> > > of each string in the dict, and the subsequent calculations (such as
> > group
> > > by) are operated according to int, and at the final result sink node,
> the
> > > result is decode into the corresponding string.
> > >
> > >
> > > *Maintain GlobalDict Changes*
> > >
> > > During process of loading, user will import new strings that are not in
> > the
> > > GlobalDict, so this requires an update mechanism for the global
> > dictionary.
> > > Stream load is actually a query, so it will also generate a fragment,
> and
> > > according to the task description of our query, there is a global dict
> in
> > > the fragment. In TabletSink, each value of the column of low
> cardinality
> > > should be evaluated with ColumnDict, if find a new string, the
> > coordinator
> > > need to inform FE that the dict of this column is invalid.
> Specifically,
> > we
> > > need to make some modifications in LoadTxnCommitRequest and add a field
> > to
> > > indicate which columns are invalid.
> > >
> > > struct TLoadTxnCommitRequest {
> > > ...
> > > list<int> invalid_col_ids
> > > }
> > >
> > > Invalid col ids should be persisted to the transaction state. Once the
> > > transaction is published, we will delete the dict from the dict
> manager.
> > In
> > > this way, the query will not use this dictionary optimization in the
> > > future. At the same time, FE generates a task to get the latest Dict.
> > When
> > > the latest Dict is obtained, the query can be optimized using Dict
> again.
> > >
> > >
> > >
> > >
> > >
> > > --
> > > Best Regards
> > >
> > > Tel:134-3991-0228
> > > Email:yiguolei@gmail.com
> > >
> >
>
>
> --
> 祝您心情愉快
>
> 衣国垒
> Tsing Hua University
> Tel:134-3991-0228
> Email:yiguolei@gmail.com
>

Re: [Discuess][DSIP] Using global dict to accelerate query that have low cardinality string

Posted by GuoLei Yi <yi...@gmail.com>.
@ Gabriel Lee
>
> 1. I think we cannot trust users completely, so we should add a protection
> mechanism for global dictionary. For example, fallback to raw string
> processing when entries in dictionary are too many.


Yes, you are right, we cannot trust users completely. If there are too many
strings (for example 10000 > 1024), global dict will be invalid and not
updated any more. All queries on that table will fallback to raw string
processing.

2. Should we consider local dictionary instead of global dictionary? In my
> mind, global dictionary is hard to maintain in an efficient way. For
> example,
>

We need global dict because the encoded string will be shuffled across
among BEs. If use a local dict then raw strings on different BE will have
different and we will also depend on global dict to optimize join operation
and the join operation is on different columns and should depend on global
dict.

3. Seems like that aggregate function dict is same as distinct, is it right?
>

Yeah.... Currently, it is a little uglily. We use a special function name
to indicate that the scan node should read the meta column of the
rowset.....

Gabriel Lee <ga...@gmail.com> 于2022年4月26日周二 14:54写道:

> Hi Guolei,
> I have some questions for this.
> 1. I think we cannot trust users completely, so we should add a protection
> mechanism for global dictionary. For example, fallback to raw string
> processing when entries in dictionary are too many.
> 2. Should we consider local dictionary instead of global dictionary? In my
> mind, global dictionary is hard to maintain in an efficient way. For
> example,
>     2.1 we should add more synchronization mechanism for this global
> dictionary to ensure consistency which drastically increase complexity
>     2.2 an background worker incurs additional workloads for both FE and BE
> 3. Seems like that aggregate function dict is same as distinct, is it
> right?
>
> On Tue, 26 Apr 2022 at 11:04, GuoLei Yi <yi...@gmail.com> wrote:
>
> > *Issue Description:*
> >
> > In Doris, many dimension columns are string type with low cardinality
> > (cardinality < 1024), such as some status codes, provinces, regions,
> > genders, etc. During group by or join operation, it has to make a hash
> > table in the form of strings. The efficiency is relatively low. Through
> > performance testing, it is found that the performance is about 3 times
> > worse than using int type. If we can convert string to int through a
> global
> > dictionary during a query, then the query efficiency will be greatly
> > improved. The solution is described below.
> >
> > *Design:*
> >
> > Maintain global dictionary for every column in FE as needed, for example
> it
> > could be organized as map<(table_id,col_id), dict> in DictManager.
> > When creating a table, set the attribute of the field to low_cardinality
> to
> > identify whether the field is a low-cardinality field. For
> low-cardinality
> > fields, use a global dictionary to speed up the corresponding query. The
> > specific syntax is as follows:
> >
> > // create table
> > create table table_name (
> >     col1 int,
> >     col2 string|varchar(10) low_cardinality
> > );
> >
> > // alter table
> > alter table table_name modify column col2 set low_cardinality=true
> >
> > At the beginning, the dict is empty, so it will not be used during query.
> > FE will start a background task to obtain global dictionary information
> > from BE for every low cardinality column, and the task is sent to BE in
> the
> > form of SQL, as follows:
> >
> > select dict(col) from table[meta]
> >
> > So that there will be a new aggregation method dict and new scannode in
> BE.
> > In the new scannode, BE reads the local dictionary of each field and get
> > all dictinct strings. dict is an aggregation function that deduplicates
> and
> > merges the dictionaries of multiple BEs and tablets. The final result is
> > saved in FE's DictManager.
> >
> > During query, FE puts the global dictionary information into the Fragment
> > structure and sends it to each BE. The structure is as follows:
> >
> > struct TStringColumnDict {
> > list<int16> id,    // the offset or id of the string
> > list<string> value    // the string value
> > }
> > struct TColumnDict {
> > TStringColumnDict xxx,
> > TDateColumnDict xxx,
> > ....
> > }
> > struct TGlobalDict {
> > map<id, TColumnDict> columnDicts // map from column id to column dict
> > }
> > struct TExecPlanFragmentParams {
> > ...
> > TGlobalDict globalDict
> > }
> >
> > The OlapScanNode will encodes the string data according to the value and
> ID
> > of each string in the dict, and the subsequent calculations (such as
> group
> > by) are operated according to int, and at the final result sink node, the
> > result is decode into the corresponding string.
> >
> >
> > *Maintain GlobalDict Changes*
> >
> > During process of loading, user will import new strings that are not in
> the
> > GlobalDict, so this requires an update mechanism for the global
> dictionary.
> > Stream load is actually a query, so it will also generate a fragment, and
> > according to the task description of our query, there is a global dict in
> > the fragment. In TabletSink, each value of the column of low cardinality
> > should be evaluated with ColumnDict, if find a new string, the
> coordinator
> > need to inform FE that the dict of this column is invalid. Specifically,
> we
> > need to make some modifications in LoadTxnCommitRequest and add a field
> to
> > indicate which columns are invalid.
> >
> > struct TLoadTxnCommitRequest {
> > ...
> > list<int> invalid_col_ids
> > }
> >
> > Invalid col ids should be persisted to the transaction state. Once the
> > transaction is published, we will delete the dict from the dict manager.
> In
> > this way, the query will not use this dictionary optimization in the
> > future. At the same time, FE generates a task to get the latest Dict.
> When
> > the latest Dict is obtained, the query can be optimized using Dict again.
> >
> >
> >
> >
> >
> > --
> > Best Regards
> >
> > Tel:134-3991-0228
> > Email:yiguolei@gmail.com
> >
>


-- 
祝您心情愉快

衣国垒
Tsing Hua University
Tel:134-3991-0228
Email:yiguolei@gmail.com

Re: [Discuess][DSIP] Using global dict to accelerate query that have low cardinality string

Posted by Gabriel Lee <ga...@gmail.com>.
Hi Guolei,
I have some questions for this.
1. I think we cannot trust users completely, so we should add a protection
mechanism for global dictionary. For example, fallback to raw string
processing when entries in dictionary are too many.
2. Should we consider local dictionary instead of global dictionary? In my
mind, global dictionary is hard to maintain in an efficient way. For
example,
    2.1 we should add more synchronization mechanism for this global
dictionary to ensure consistency which drastically increase complexity
    2.2 an background worker incurs additional workloads for both FE and BE
3. Seems like that aggregate function dict is same as distinct, is it right?

On Tue, 26 Apr 2022 at 11:04, GuoLei Yi <yi...@gmail.com> wrote:

> *Issue Description:*
>
> In Doris, many dimension columns are string type with low cardinality
> (cardinality < 1024), such as some status codes, provinces, regions,
> genders, etc. During group by or join operation, it has to make a hash
> table in the form of strings. The efficiency is relatively low. Through
> performance testing, it is found that the performance is about 3 times
> worse than using int type. If we can convert string to int through a global
> dictionary during a query, then the query efficiency will be greatly
> improved. The solution is described below.
>
> *Design:*
>
> Maintain global dictionary for every column in FE as needed, for example it
> could be organized as map<(table_id,col_id), dict> in DictManager.
> When creating a table, set the attribute of the field to low_cardinality to
> identify whether the field is a low-cardinality field. For low-cardinality
> fields, use a global dictionary to speed up the corresponding query. The
> specific syntax is as follows:
>
> // create table
> create table table_name (
>     col1 int,
>     col2 string|varchar(10) low_cardinality
> );
>
> // alter table
> alter table table_name modify column col2 set low_cardinality=true
>
> At the beginning, the dict is empty, so it will not be used during query.
> FE will start a background task to obtain global dictionary information
> from BE for every low cardinality column, and the task is sent to BE in the
> form of SQL, as follows:
>
> select dict(col) from table[meta]
>
> So that there will be a new aggregation method dict and new scannode in BE.
> In the new scannode, BE reads the local dictionary of each field and get
> all dictinct strings. dict is an aggregation function that deduplicates and
> merges the dictionaries of multiple BEs and tablets. The final result is
> saved in FE's DictManager.
>
> During query, FE puts the global dictionary information into the Fragment
> structure and sends it to each BE. The structure is as follows:
>
> struct TStringColumnDict {
> list<int16> id,    // the offset or id of the string
> list<string> value    // the string value
> }
> struct TColumnDict {
> TStringColumnDict xxx,
> TDateColumnDict xxx,
> ....
> }
> struct TGlobalDict {
> map<id, TColumnDict> columnDicts // map from column id to column dict
> }
> struct TExecPlanFragmentParams {
> ...
> TGlobalDict globalDict
> }
>
> The OlapScanNode will encodes the string data according to the value and ID
> of each string in the dict, and the subsequent calculations (such as group
> by) are operated according to int, and at the final result sink node, the
> result is decode into the corresponding string.
>
>
> *Maintain GlobalDict Changes*
>
> During process of loading, user will import new strings that are not in the
> GlobalDict, so this requires an update mechanism for the global dictionary.
> Stream load is actually a query, so it will also generate a fragment, and
> according to the task description of our query, there is a global dict in
> the fragment. In TabletSink, each value of the column of low cardinality
> should be evaluated with ColumnDict, if find a new string, the coordinator
> need to inform FE that the dict of this column is invalid. Specifically, we
> need to make some modifications in LoadTxnCommitRequest and add a field to
> indicate which columns are invalid.
>
> struct TLoadTxnCommitRequest {
> ...
> list<int> invalid_col_ids
> }
>
> Invalid col ids should be persisted to the transaction state. Once the
> transaction is published, we will delete the dict from the dict manager. In
> this way, the query will not use this dictionary optimization in the
> future. At the same time, FE generates a task to get the latest Dict. When
> the latest Dict is obtained, the query can be optimized using Dict again.
>
>
>
>
>
> --
> Best Regards
>
> Tel:134-3991-0228
> Email:yiguolei@gmail.com
>