You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "lpengdream@163.com" <lp...@163.com> on 2022/05/09 03:16:00 UTC

【Could we support distribute by For FlinkSql】

Hello:
    Now we cann't add a shuffle-operation in a sql-job.
Sometimes , for example, I have a kafka-source(three partitions) with parallelism three. And then I have a lookup-join function, I want process the data distribute by id so that the data can split into thre parallelism evenly (The source maybe slant seriously).
In DataStream API i can do it with keyby(), but it's so sad that i can do nothing when i use a sql;
Maybe we can do it like 'select id, f1,f2 from sourceTable distribute by id' like we do it in SparkSql.

Sot that we can make change on the picture  in sql-mode;

    



lpengdream@163.com

Re: Re: 【Could we support distribute by For FlinkSql】

Posted by Yun Tang <my...@live.com>.
Hi Godfrey,

 @Qianfei (kimijqf) had ever already supported this "distribute by" feature nearly two years ago in the internal forked Flink version.
 Would you like to share the FLIP doc? Maybe we can give some feedback and work together to make this feature done.

Best
Yun Tang
________________________________
From: godfrey he <go...@gmail.com>
Sent: Tuesday, May 10, 2022 17:33
To: dev <de...@flink.apache.org>
Subject: Re: Re: 【Could we support distribute by For FlinkSql】

Hi, Ipengdream. I will drive this work.
We will support this functionality via hints,
because "distribute by" is not in the sql standard.
But it will be supported in hive dialect.
I will post the FLIP doc recently.

Best,
Godfrey


Jark Wu <im...@gmail.com> 于2022年5月9日周一 16:03写道:

>
> We will start a FLIP discussion in the dev mailing list, so please watch on
> the ML.
> I also find that you opened FLINK-27541, we will also update FLINK-27541
> once we have an initial FLIP.
>
> Best,
> Jark
>
> On Mon, 9 May 2022 at 15:18, lpengdream@163.com <lp...@163.com> wrote:
>
> > Yeah!  That's great. Thank you!   Where can i get more information about
> > that?
> >
> >
> >
> > lpengdream@163.com
> >
> > 发件人: Jark Wu
> > 发送时间: 2022-05-09 14:12
> > 收件人: dev
> > 抄送: 贺小令
> > 主题: Re: Re: 【Could we support distribute by For FlinkSql】
> > I got what you want, maybe something like DISTRIBUTED BY in Hive SQL.
> > The community is planning to support this feature but has not started yet.
> > @Godfrey will drive this work.
> >
> > Best,
> > Jark
> >
> > On Mon, 9 May 2022 at 13:45, lpengdream@163.com <lp...@163.com>
> > wrote:
> >
> > > Hi
> > >     Thanks for your reply.
> > >     The way I want is not only for hash-lookup-join,   there are manay
> > > operators  need  a hash-operation to solve the skew-problem.  Lookup-join
> > > is a special scene.
> > >     So I hope there is a operator could make a shuffle. Maybe it's a way
> > > to solve the problems ?
> > >
> > >
> > >
> > https://docs.google.com/document/d/1D7AX-_wttMNY53TxLQxiDaRyDVCeEZYCE8AwYflDXZM/edit?usp=sharing
> > >
> > >
> > >
> > >
> > >
> > > lpengdream@163.com
> > >
> > > 发件人: Jark Wu
> > > 发送时间: 2022-05-09 12:27
> > > 收件人: dev
> > > 主题: Re: 【Could we support distribute by For FlinkSql】
> > > Hi,
> > >
> > > If you are looking for the hash lookup join, there is an in-progress
> > > FLIP-204[1] working for it.
> > >
> > > Btw, I still can't see your picture. You can upload your picture to some
> > > image service and share a link here.
> > >
> > > Best,
> > > Jark
> > >
> > > [1]:
> > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
> > >
> > > On Mon, 9 May 2022 at 11:22, lpengdream@163.com <lp...@163.com>
> > > wrote:
> > >
> > > > Sorry!
> > > > The destroied picture is the attachment ;
> > > >
> > > > ------------------------------
> > > > lpengdream@163.com
> > > >
> > > >
> > > > *发件人:* lpengdream@163.com
> > > > *发送时间:* 2022-05-09 11:16
> > > > *收件人:* user-zh <us...@flink.apache.org>; dev <de...@flink.apache.org>
> > > > *主题:* 【Could we support distribute by For FlinkSql】
> > > > Hello:
> > > >     Now we cann't add a shuffle-operation in a sql-job.
> > > > Sometimes , for example, I have a kafka-source(three partitions) with
> > > > parallelism three. And then I have a lookup-join function, I want
> > process
> > > > the data distribute by id so that the data can split into thre
> > > parallelism
> > > > evenly (The source maybe slant seriously).
> > > > In DataStream API i can do it with keyby(), but it's so sad that i can
> > do
> > > > nothing when i use a sql;
> > > > Maybe we can do it like 'select id, f1,f2 from sourceTable distribute
> > by
> > > > id' like we do it in SparkSql.
> > > >
> > > > Sot that we can make change on the picture  in sql-mode;
> > > >
> > > >
> > > >
> > > > ------------------------------
> > > > lpengdream@163.com
> > > >
> > > >
> > >
> >

Re: Re: 【Could we support distribute by For FlinkSql】

Posted by godfrey he <go...@gmail.com>.
Hi, Ipengdream. I will drive this work.
We will support this functionality via hints,
because "distribute by" is not in the sql standard.
But it will be supported in hive dialect.
I will post the FLIP doc recently.

Best,
Godfrey


Jark Wu <im...@gmail.com> 于2022年5月9日周一 16:03写道:

>
> We will start a FLIP discussion in the dev mailing list, so please watch on
> the ML.
> I also find that you opened FLINK-27541, we will also update FLINK-27541
> once we have an initial FLIP.
>
> Best,
> Jark
>
> On Mon, 9 May 2022 at 15:18, lpengdream@163.com <lp...@163.com> wrote:
>
> > Yeah!  That's great. Thank you!   Where can i get more information about
> > that?
> >
> >
> >
> > lpengdream@163.com
> >
> > 发件人: Jark Wu
> > 发送时间: 2022-05-09 14:12
> > 收件人: dev
> > 抄送: 贺小令
> > 主题: Re: Re: 【Could we support distribute by For FlinkSql】
> > I got what you want, maybe something like DISTRIBUTED BY in Hive SQL.
> > The community is planning to support this feature but has not started yet.
> > @Godfrey will drive this work.
> >
> > Best,
> > Jark
> >
> > On Mon, 9 May 2022 at 13:45, lpengdream@163.com <lp...@163.com>
> > wrote:
> >
> > > Hi
> > >     Thanks for your reply.
> > >     The way I want is not only for hash-lookup-join,   there are manay
> > > operators  need  a hash-operation to solve the skew-problem.  Lookup-join
> > > is a special scene.
> > >     So I hope there is a operator could make a shuffle. Maybe it's a way
> > > to solve the problems ?
> > >
> > >
> > >
> > https://docs.google.com/document/d/1D7AX-_wttMNY53TxLQxiDaRyDVCeEZYCE8AwYflDXZM/edit?usp=sharing
> > >
> > >
> > >
> > >
> > >
> > > lpengdream@163.com
> > >
> > > 发件人: Jark Wu
> > > 发送时间: 2022-05-09 12:27
> > > 收件人: dev
> > > 主题: Re: 【Could we support distribute by For FlinkSql】
> > > Hi,
> > >
> > > If you are looking for the hash lookup join, there is an in-progress
> > > FLIP-204[1] working for it.
> > >
> > > Btw, I still can't see your picture. You can upload your picture to some
> > > image service and share a link here.
> > >
> > > Best,
> > > Jark
> > >
> > > [1]:
> > >
> > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
> > >
> > > On Mon, 9 May 2022 at 11:22, lpengdream@163.com <lp...@163.com>
> > > wrote:
> > >
> > > > Sorry!
> > > > The destroied picture is the attachment ;
> > > >
> > > > ------------------------------
> > > > lpengdream@163.com
> > > >
> > > >
> > > > *发件人:* lpengdream@163.com
> > > > *发送时间:* 2022-05-09 11:16
> > > > *收件人:* user-zh <us...@flink.apache.org>; dev <de...@flink.apache.org>
> > > > *主题:* 【Could we support distribute by For FlinkSql】
> > > > Hello:
> > > >     Now we cann't add a shuffle-operation in a sql-job.
> > > > Sometimes , for example, I have a kafka-source(three partitions) with
> > > > parallelism three. And then I have a lookup-join function, I want
> > process
> > > > the data distribute by id so that the data can split into thre
> > > parallelism
> > > > evenly (The source maybe slant seriously).
> > > > In DataStream API i can do it with keyby(), but it's so sad that i can
> > do
> > > > nothing when i use a sql;
> > > > Maybe we can do it like 'select id, f1,f2 from sourceTable distribute
> > by
> > > > id' like we do it in SparkSql.
> > > >
> > > > Sot that we can make change on the picture  in sql-mode;
> > > >
> > > >
> > > >
> > > > ------------------------------
> > > > lpengdream@163.com
> > > >
> > > >
> > >
> >

Re: Re: 【Could we support distribute by For FlinkSql】

Posted by Jark Wu <im...@gmail.com>.
We will start a FLIP discussion in the dev mailing list, so please watch on
the ML.
I also find that you opened FLINK-27541, we will also update FLINK-27541
once we have an initial FLIP.

Best,
Jark

On Mon, 9 May 2022 at 15:18, lpengdream@163.com <lp...@163.com> wrote:

> Yeah!  That's great. Thank you!   Where can i get more information about
> that?
>
>
>
> lpengdream@163.com
>
> 发件人: Jark Wu
> 发送时间: 2022-05-09 14:12
> 收件人: dev
> 抄送: 贺小令
> 主题: Re: Re: 【Could we support distribute by For FlinkSql】
> I got what you want, maybe something like DISTRIBUTED BY in Hive SQL.
> The community is planning to support this feature but has not started yet.
> @Godfrey will drive this work.
>
> Best,
> Jark
>
> On Mon, 9 May 2022 at 13:45, lpengdream@163.com <lp...@163.com>
> wrote:
>
> > Hi
> >     Thanks for your reply.
> >     The way I want is not only for hash-lookup-join,   there are manay
> > operators  need  a hash-operation to solve the skew-problem.  Lookup-join
> > is a special scene.
> >     So I hope there is a operator could make a shuffle. Maybe it's a way
> > to solve the problems ?
> >
> >
> >
> https://docs.google.com/document/d/1D7AX-_wttMNY53TxLQxiDaRyDVCeEZYCE8AwYflDXZM/edit?usp=sharing
> >
> >
> >
> >
> >
> > lpengdream@163.com
> >
> > 发件人: Jark Wu
> > 发送时间: 2022-05-09 12:27
> > 收件人: dev
> > 主题: Re: 【Could we support distribute by For FlinkSql】
> > Hi,
> >
> > If you are looking for the hash lookup join, there is an in-progress
> > FLIP-204[1] working for it.
> >
> > Btw, I still can't see your picture. You can upload your picture to some
> > image service and share a link here.
> >
> > Best,
> > Jark
> >
> > [1]:
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
> >
> > On Mon, 9 May 2022 at 11:22, lpengdream@163.com <lp...@163.com>
> > wrote:
> >
> > > Sorry!
> > > The destroied picture is the attachment ;
> > >
> > > ------------------------------
> > > lpengdream@163.com
> > >
> > >
> > > *发件人:* lpengdream@163.com
> > > *发送时间:* 2022-05-09 11:16
> > > *收件人:* user-zh <us...@flink.apache.org>; dev <de...@flink.apache.org>
> > > *主题:* 【Could we support distribute by For FlinkSql】
> > > Hello:
> > >     Now we cann't add a shuffle-operation in a sql-job.
> > > Sometimes , for example, I have a kafka-source(three partitions) with
> > > parallelism three. And then I have a lookup-join function, I want
> process
> > > the data distribute by id so that the data can split into thre
> > parallelism
> > > evenly (The source maybe slant seriously).
> > > In DataStream API i can do it with keyby(), but it's so sad that i can
> do
> > > nothing when i use a sql;
> > > Maybe we can do it like 'select id, f1,f2 from sourceTable distribute
> by
> > > id' like we do it in SparkSql.
> > >
> > > Sot that we can make change on the picture  in sql-mode;
> > >
> > >
> > >
> > > ------------------------------
> > > lpengdream@163.com
> > >
> > >
> >
>

Re: Re: 【Could we support distribute by For FlinkSql】

Posted by "lpengdream@163.com" <lp...@163.com>.
Yeah!  That's great. Thank you!   Where can i get more information about that?   
 


lpengdream@163.com
 
发件人: Jark Wu
发送时间: 2022-05-09 14:12
收件人: dev
抄送: 贺小令
主题: Re: Re: 【Could we support distribute by For FlinkSql】
I got what you want, maybe something like DISTRIBUTED BY in Hive SQL.
The community is planning to support this feature but has not started yet.
@Godfrey will drive this work.
 
Best,
Jark
 
On Mon, 9 May 2022 at 13:45, lpengdream@163.com <lp...@163.com> wrote:
 
> Hi
>     Thanks for your reply.
>     The way I want is not only for hash-lookup-join,   there are manay
> operators  need  a hash-operation to solve the skew-problem.  Lookup-join
> is a special scene.
>     So I hope there is a operator could make a shuffle. Maybe it's a way
> to solve the problems ?
>
>
> https://docs.google.com/document/d/1D7AX-_wttMNY53TxLQxiDaRyDVCeEZYCE8AwYflDXZM/edit?usp=sharing
>
>
>
>
>
> lpengdream@163.com
>
> 发件人: Jark Wu
> 发送时间: 2022-05-09 12:27
> 收件人: dev
> 主题: Re: 【Could we support distribute by For FlinkSql】
> Hi,
>
> If you are looking for the hash lookup join, there is an in-progress
> FLIP-204[1] working for it.
>
> Btw, I still can't see your picture. You can upload your picture to some
> image service and share a link here.
>
> Best,
> Jark
>
> [1]:
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
>
> On Mon, 9 May 2022 at 11:22, lpengdream@163.com <lp...@163.com>
> wrote:
>
> > Sorry!
> > The destroied picture is the attachment ;
> >
> > ------------------------------
> > lpengdream@163.com
> >
> >
> > *发件人:* lpengdream@163.com
> > *发送时间:* 2022-05-09 11:16
> > *收件人:* user-zh <us...@flink.apache.org>; dev <de...@flink.apache.org>
> > *主题:* 【Could we support distribute by For FlinkSql】
> > Hello:
> >     Now we cann't add a shuffle-operation in a sql-job.
> > Sometimes , for example, I have a kafka-source(three partitions) with
> > parallelism three. And then I have a lookup-join function, I want process
> > the data distribute by id so that the data can split into thre
> parallelism
> > evenly (The source maybe slant seriously).
> > In DataStream API i can do it with keyby(), but it's so sad that i can do
> > nothing when i use a sql;
> > Maybe we can do it like 'select id, f1,f2 from sourceTable distribute by
> > id' like we do it in SparkSql.
> >
> > Sot that we can make change on the picture  in sql-mode;
> >
> >
> >
> > ------------------------------
> > lpengdream@163.com
> >
> >
>

Re: Re: 【Could we support distribute by For FlinkSql】

Posted by Jark Wu <im...@gmail.com>.
I got what you want, maybe something like DISTRIBUTED BY in Hive SQL.
The community is planning to support this feature but has not started yet.
@Godfrey will drive this work.

Best,
Jark

On Mon, 9 May 2022 at 13:45, lpengdream@163.com <lp...@163.com> wrote:

> Hi
>     Thanks for your reply.
>     The way I want is not only for hash-lookup-join,   there are manay
> operators  need  a hash-operation to solve the skew-problem.  Lookup-join
> is a special scene.
>     So I hope there is a operator could make a shuffle. Maybe it's a way
> to solve the problems ?
>
>
> https://docs.google.com/document/d/1D7AX-_wttMNY53TxLQxiDaRyDVCeEZYCE8AwYflDXZM/edit?usp=sharing
>
>
>
>
>
> lpengdream@163.com
>
> 发件人: Jark Wu
> 发送时间: 2022-05-09 12:27
> 收件人: dev
> 主题: Re: 【Could we support distribute by For FlinkSql】
> Hi,
>
> If you are looking for the hash lookup join, there is an in-progress
> FLIP-204[1] working for it.
>
> Btw, I still can't see your picture. You can upload your picture to some
> image service and share a link here.
>
> Best,
> Jark
>
> [1]:
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
>
> On Mon, 9 May 2022 at 11:22, lpengdream@163.com <lp...@163.com>
> wrote:
>
> > Sorry!
> > The destroied picture is the attachment ;
> >
> > ------------------------------
> > lpengdream@163.com
> >
> >
> > *发件人:* lpengdream@163.com
> > *发送时间:* 2022-05-09 11:16
> > *收件人:* user-zh <us...@flink.apache.org>; dev <de...@flink.apache.org>
> > *主题:* 【Could we support distribute by For FlinkSql】
> > Hello:
> >     Now we cann't add a shuffle-operation in a sql-job.
> > Sometimes , for example, I have a kafka-source(three partitions) with
> > parallelism three. And then I have a lookup-join function, I want process
> > the data distribute by id so that the data can split into thre
> parallelism
> > evenly (The source maybe slant seriously).
> > In DataStream API i can do it with keyby(), but it's so sad that i can do
> > nothing when i use a sql;
> > Maybe we can do it like 'select id, f1,f2 from sourceTable distribute by
> > id' like we do it in SparkSql.
> >
> > Sot that we can make change on the picture  in sql-mode;
> >
> >
> >
> > ------------------------------
> > lpengdream@163.com
> >
> >
>

Re: Re: 【Could we support distribute by For FlinkSql】

Posted by "lpengdream@163.com" <lp...@163.com>.
Hi 
    Thanks for your reply.
    The way I want is not only for hash-lookup-join,   there are manay operators  need  a hash-operation to solve the skew-problem.  Lookup-join is a special scene.
    So I hope there is a operator could make a shuffle. Maybe it's a way to solve the problems ? 
       
    https://docs.google.com/document/d/1D7AX-_wttMNY53TxLQxiDaRyDVCeEZYCE8AwYflDXZM/edit?usp=sharing
    




lpengdream@163.com
 
发件人: Jark Wu
发送时间: 2022-05-09 12:27
收件人: dev
主题: Re: 【Could we support distribute by For FlinkSql】
Hi,
 
If you are looking for the hash lookup join, there is an in-progress
FLIP-204[1] working for it.
 
Btw, I still can't see your picture. You can upload your picture to some
image service and share a link here.
 
Best,
Jark
 
[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
 
On Mon, 9 May 2022 at 11:22, lpengdream@163.com <lp...@163.com> wrote:
 
> Sorry!
> The destroied picture is the attachment ;
>
> ------------------------------
> lpengdream@163.com
>
>
> *发件人:* lpengdream@163.com
> *发送时间:* 2022-05-09 11:16
> *收件人:* user-zh <us...@flink.apache.org>; dev <de...@flink.apache.org>
> *主题:* 【Could we support distribute by For FlinkSql】
> Hello:
>     Now we cann't add a shuffle-operation in a sql-job.
> Sometimes , for example, I have a kafka-source(three partitions) with
> parallelism three. And then I have a lookup-join function, I want process
> the data distribute by id so that the data can split into thre parallelism
> evenly (The source maybe slant seriously).
> In DataStream API i can do it with keyby(), but it's so sad that i can do
> nothing when i use a sql;
> Maybe we can do it like 'select id, f1,f2 from sourceTable distribute by
> id' like we do it in SparkSql.
>
> Sot that we can make change on the picture  in sql-mode;
>
>
>
> ------------------------------
> lpengdream@163.com
>
>

Re: 【Could we support distribute by For FlinkSql】

Posted by Jark Wu <im...@gmail.com>.
Hi,

If you are looking for the hash lookup join, there is an in-progress
FLIP-204[1] working for it.

Btw, I still can't see your picture. You can upload your picture to some
image service and share a link here.

Best,
Jark

[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join

On Mon, 9 May 2022 at 11:22, lpengdream@163.com <lp...@163.com> wrote:

> Sorry!
> The destroied picture is the attachment ;
>
> ------------------------------
> lpengdream@163.com
>
>
> *发件人:* lpengdream@163.com
> *发送时间:* 2022-05-09 11:16
> *收件人:* user-zh <us...@flink.apache.org>; dev <de...@flink.apache.org>
> *主题:* 【Could we support distribute by For FlinkSql】
> Hello:
>     Now we cann't add a shuffle-operation in a sql-job.
> Sometimes , for example, I have a kafka-source(three partitions) with
> parallelism three. And then I have a lookup-join function, I want process
> the data distribute by id so that the data can split into thre parallelism
> evenly (The source maybe slant seriously).
> In DataStream API i can do it with keyby(), but it's so sad that i can do
> nothing when i use a sql;
> Maybe we can do it like 'select id, f1,f2 from sourceTable distribute by
> id' like we do it in SparkSql.
>
> Sot that we can make change on the picture  in sql-mode;
>
>
>
> ------------------------------
> lpengdream@163.com
>
>

回复: 【Could we support distribute by For FlinkSql】

Posted by "lpengdream@163.com" <lp...@163.com>.
Sorry! 
The destroied picture is the attachment ;



lpengdream@163.com
 
发件人: lpengdream@163.com
发送时间: 2022-05-09 11:16
收件人: user-zh; dev
主题: 【Could we support distribute by For FlinkSql】
Hello:
    Now we cann't add a shuffle-operation in a sql-job.
Sometimes , for example, I have a kafka-source(three partitions) with parallelism three. And then I have a lookup-join function, I want process the data distribute by id so that the data can split into thre parallelism evenly (The source maybe slant seriously).
In DataStream API i can do it with keyby(), but it's so sad that i can do nothing when i use a sql;
Maybe we can do it like 'select id, f1,f2 from sourceTable distribute by id' like we do it in SparkSql.

Sot that we can make change on the picture  in sql-mode;

    



lpengdream@163.com