You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by wushijjian5 <ws...@163.com> on 2021/11/24 13:04:48 UTC

FlinkSql回撤流


DataStream<Tuple4<String, String,Integer, Integer>> dataStream = env.fromElements(
        new Tuple4<>("a", "a1",30,1),
        new Tuple4<>("b", "b1",20,1),new Tuple4<>("a","a1",30,0),
        new Tuple4<>("a","a2",30,1),
        new Tuple4<>("a","a3",30,1));
tEnv.createTemporaryView("tmpTable", dataStream, $("user"),$("ord"), $("num"), $("flag"));
Table table = tEnv.sqlQuery(
        " select user,sum(num) as num" +
        " from (" +
        "   select user,ord,LAST_VALUE(num) as num,LAST_VALUE(flag) as flag " +
                "from tmpTable " +
                "group by user,ord " +
                ") t1" +
        " where flag=1 " +
        " group by user" +
        "");
table.execute().print();

这样一个回撤流,我能在sql里获取到第一列的操作符么,比如 select op,user,num from xxx   只通过sql的方式


Re: FlinkSql回撤流

Posted by wushijjian5 <ws...@163.com>.
这个可以,非常感谢。

select user, sum(num * IF(flag=1, 1, 0)) as num
> from (
>  select user, ord, LAST_VALUE(num) as num, LAST_VALUE(flag) as flag
>  from tmpTable,
>  group by user, ord
>  ) t1
> group by user



-------

> 2021年11月25日 11:19,Tony Wei <to...@gmail.com> 写道:
> 
> 上一封的 sql 稍微有誤,不需要 group by user, ord 才對:
> 
> select user, sum(num) as num
>> from (
>>  select user, ord, num * IF(flag=1, 1, -1) as num
>>  from tmpTable
>>  ) t1
>> group by user
> 
> 
> 或者也可以考慮這種寫法:
> 
> select user, sum(num * IF(flag=1, 1, 0)) as num
>> from (
>>  select user, ord, LAST_VALUE(num) as num, LAST_VALUE(flag) as flag
>>  from tmpTable,
>>  group by user, ord
>>  ) t1
>> group by user
> 
> 
> best regards,
> 
> Tony Wei <to...@gmail.com> 於 2021年11月25日 週四 上午11:01寫道:
> 
>> Hi,
>> 
>> 對於這個 sql 的回撤流,我感覺沒什麼問題。原因是如果在 batch 執行模式下,首三行的輸出結果應該為:
>> 
>> +------+-------+
>> | user | num |
>> +------+-------+
>> | b      | 20    |
>> +------+-------+
>> 
>> 因為 user a 被 `where flag = 1` 過濾掉了,所以實際下游應該是撤回了 insert 的操作才對,而不是更新成 num = 0。
>> 
>> 或許可以考慮把 sql 寫法改為這樣試試?
>> 
>> select user, sum(num) as num
>>> from (
>>>  select user, ord, num * IF(flag=1, 1, -1) as num
>>>  from tmpTable,
>>>  group by user, ord
>>>  ) t1
>>> group by user
>> 
>> 
>> best regards,
>> 
>> wushijjian5 <ws...@163.com> 於 2021年11月25日 週四 上午10:21寫道:
>> 
>>> Hi,
>>>        这三条数据的话:
>>>    new Tuple4<>("a", "a1",30,1),new Tuple4<>("b", "b1",20,1),new
>>> Tuple4<>("a","a1",30,0)
>>> 
>>> 计算结果是:
>>> | +I |                              a |          30 |
>>> | +I |                              b |          20 |
>>> | -D |                              a |          30 |
>>> 
>>> 实际想要的是
>>> a 30
>>> b   20
>>> a   0
>>> 就是a用户下单30块,b下单20块,a取消下单30块 , 最后a应该返回0 , 想要根据-D标识下发0值数据,下游直接覆盖。
>>> 不知道说的清不清楚。
>>> 只通过flink-sql方式好像实现不了
>>> 
>>>> 2021年11月25日 09:45,Caizhi Weng <ts...@gmail.com> 写道:
>>>> 
>>>> Hi!
>>>> 
>>>> 无法在 SQL 里获得第一列的操作符。但可以通过 table.execute().collect() 获得产生的
>>> CloseableIterator<Row>,然后通过 Row#getKind 获得该 row 对应的 op。
>>>> 
>>>> 顺便问一下,为什么需要在 SQL 里获得 op 呢?因为这个 op 应该只对内部算子以及 sink 有用,用户一般来说是不需要感知的。
>>>> 
>>>> wushijjian5 <wsjwoods555@163.com <ma...@163.com>>
>>> 于2021年11月24日周三 下午9:05写道:
>>>> 
>>>> 
>>>> DataStream<Tuple4<String, String,Integer, Integer>> dataStream =
>>> env.fromElements(
>>>>        new Tuple4<>("a", "a1",30,1),
>>>>        new Tuple4<>("b", "b1",20,1),new Tuple4<>("a","a1",30,0),
>>>>        new Tuple4<>("a","a2",30,1),
>>>>        new Tuple4<>("a","a3",30,1));
>>>> tEnv.createTemporaryView("tmpTable", dataStream, $("user"),$("ord"),
>>> $("num"), $("flag"));
>>>> Table table = tEnv.sqlQuery(
>>>>        " select user,sum(num) as num" +
>>>>        " from (" +
>>>>        "   select user,ord,LAST_VALUE(num) as num,LAST_VALUE(flag) as
>>> flag " +
>>>>                "from tmpTable " +
>>>>                "group by user,ord " +
>>>>                ") t1" +
>>>>        " where flag=1 " +
>>>>        " group by user" +
>>>>        "");
>>>> table.execute().print();
>>>> 
>>>> 这样一个回撤流,我能在sql里获取到第一列的操作符么,比如 select op,user,num from xxx   只通过sql的方式
>>>> 
>>> 
>>> 


Re: FlinkSql回撤流

Posted by Tony Wei <to...@gmail.com>.
上一封的 sql 稍微有誤,不需要 group by user, ord 才對:

select user, sum(num) as num
> from (
>   select user, ord, num * IF(flag=1, 1, -1) as num
>   from tmpTable
>   ) t1
> group by user


或者也可以考慮這種寫法:

select user, sum(num * IF(flag=1, 1, 0)) as num
> from (
>   select user, ord, LAST_VALUE(num) as num, LAST_VALUE(flag) as flag
>   from tmpTable,
>   group by user, ord
>   ) t1
> group by user


best regards,

Tony Wei <to...@gmail.com> 於 2021年11月25日 週四 上午11:01寫道:

> Hi,
>
> 對於這個 sql 的回撤流,我感覺沒什麼問題。原因是如果在 batch 執行模式下,首三行的輸出結果應該為:
>
> +------+-------+
> | user | num |
> +------+-------+
> | b      | 20    |
> +------+-------+
>
> 因為 user a 被 `where flag = 1` 過濾掉了,所以實際下游應該是撤回了 insert 的操作才對,而不是更新成 num = 0。
>
> 或許可以考慮把 sql 寫法改為這樣試試?
>
> select user, sum(num) as num
>> from (
>>   select user, ord, num * IF(flag=1, 1, -1) as num
>>   from tmpTable,
>>   group by user, ord
>>   ) t1
>> group by user
>
>
> best regards,
>
> wushijjian5 <ws...@163.com> 於 2021年11月25日 週四 上午10:21寫道:
>
>> Hi,
>>         这三条数据的话:
>>     new Tuple4<>("a", "a1",30,1),new Tuple4<>("b", "b1",20,1),new
>> Tuple4<>("a","a1",30,0)
>>
>> 计算结果是:
>> | +I |                              a |          30 |
>> | +I |                              b |          20 |
>> | -D |                              a |          30 |
>>
>> 实际想要的是
>> a 30
>> b   20
>> a   0
>> 就是a用户下单30块,b下单20块,a取消下单30块 , 最后a应该返回0 , 想要根据-D标识下发0值数据,下游直接覆盖。
>> 不知道说的清不清楚。
>> 只通过flink-sql方式好像实现不了
>>
>> > 2021年11月25日 09:45,Caizhi Weng <ts...@gmail.com> 写道:
>> >
>> > Hi!
>> >
>> > 无法在 SQL 里获得第一列的操作符。但可以通过 table.execute().collect() 获得产生的
>> CloseableIterator<Row>,然后通过 Row#getKind 获得该 row 对应的 op。
>> >
>> > 顺便问一下,为什么需要在 SQL 里获得 op 呢?因为这个 op 应该只对内部算子以及 sink 有用,用户一般来说是不需要感知的。
>> >
>> > wushijjian5 <wsjwoods555@163.com <ma...@163.com>>
>> 于2021年11月24日周三 下午9:05写道:
>> >
>> >
>> > DataStream<Tuple4<String, String,Integer, Integer>> dataStream =
>> env.fromElements(
>> >         new Tuple4<>("a", "a1",30,1),
>> >         new Tuple4<>("b", "b1",20,1),new Tuple4<>("a","a1",30,0),
>> >         new Tuple4<>("a","a2",30,1),
>> >         new Tuple4<>("a","a3",30,1));
>> > tEnv.createTemporaryView("tmpTable", dataStream, $("user"),$("ord"),
>> $("num"), $("flag"));
>> > Table table = tEnv.sqlQuery(
>> >         " select user,sum(num) as num" +
>> >         " from (" +
>> >         "   select user,ord,LAST_VALUE(num) as num,LAST_VALUE(flag) as
>> flag " +
>> >                 "from tmpTable " +
>> >                 "group by user,ord " +
>> >                 ") t1" +
>> >         " where flag=1 " +
>> >         " group by user" +
>> >         "");
>> > table.execute().print();
>> >
>> > 这样一个回撤流,我能在sql里获取到第一列的操作符么,比如 select op,user,num from xxx   只通过sql的方式
>> >
>>
>>

Re: FlinkSql回撤流

Posted by Tony Wei <to...@gmail.com>.
Hi,

對於這個 sql 的回撤流,我感覺沒什麼問題。原因是如果在 batch 執行模式下,首三行的輸出結果應該為:

+------+-------+
| user | num |
+------+-------+
| b      | 20    |
+------+-------+

因為 user a 被 `where flag = 1` 過濾掉了,所以實際下游應該是撤回了 insert 的操作才對,而不是更新成 num = 0。

或許可以考慮把 sql 寫法改為這樣試試?

select user, sum(num) as num
> from (
>   select user, ord, num * IF(flag=1, 1, -1) as num
>   from tmpTable,
>   group by user, ord
>   ) t1
> group by user


best regards,

wushijjian5 <ws...@163.com> 於 2021年11月25日 週四 上午10:21寫道:

> Hi,
>         这三条数据的话:
>     new Tuple4<>("a", "a1",30,1),new Tuple4<>("b", "b1",20,1),new
> Tuple4<>("a","a1",30,0)
>
> 计算结果是:
> | +I |                              a |          30 |
> | +I |                              b |          20 |
> | -D |                              a |          30 |
>
> 实际想要的是
> a 30
> b   20
> a   0
> 就是a用户下单30块,b下单20块,a取消下单30块 , 最后a应该返回0 , 想要根据-D标识下发0值数据,下游直接覆盖。
> 不知道说的清不清楚。
> 只通过flink-sql方式好像实现不了
>
> > 2021年11月25日 09:45,Caizhi Weng <ts...@gmail.com> 写道:
> >
> > Hi!
> >
> > 无法在 SQL 里获得第一列的操作符。但可以通过 table.execute().collect() 获得产生的
> CloseableIterator<Row>,然后通过 Row#getKind 获得该 row 对应的 op。
> >
> > 顺便问一下,为什么需要在 SQL 里获得 op 呢?因为这个 op 应该只对内部算子以及 sink 有用,用户一般来说是不需要感知的。
> >
> > wushijjian5 <wsjwoods555@163.com <ma...@163.com>>
> 于2021年11月24日周三 下午9:05写道:
> >
> >
> > DataStream<Tuple4<String, String,Integer, Integer>> dataStream =
> env.fromElements(
> >         new Tuple4<>("a", "a1",30,1),
> >         new Tuple4<>("b", "b1",20,1),new Tuple4<>("a","a1",30,0),
> >         new Tuple4<>("a","a2",30,1),
> >         new Tuple4<>("a","a3",30,1));
> > tEnv.createTemporaryView("tmpTable", dataStream, $("user"),$("ord"),
> $("num"), $("flag"));
> > Table table = tEnv.sqlQuery(
> >         " select user,sum(num) as num" +
> >         " from (" +
> >         "   select user,ord,LAST_VALUE(num) as num,LAST_VALUE(flag) as
> flag " +
> >                 "from tmpTable " +
> >                 "group by user,ord " +
> >                 ") t1" +
> >         " where flag=1 " +
> >         " group by user" +
> >         "");
> > table.execute().print();
> >
> > 这样一个回撤流,我能在sql里获取到第一列的操作符么,比如 select op,user,num from xxx   只通过sql的方式
> >
>
>

Re: FlinkSql回撤流

Posted by wushijjian5 <ws...@163.com>.
Hi,
	这三条数据的话:
    new Tuple4<>("a", "a1",30,1),new Tuple4<>("b", "b1",20,1),new Tuple4<>("a","a1",30,0)

计算结果是:
| +I |                              a |          30 |
| +I |                              b |          20 |
| -D |                              a |          30 |

实际想要的是 
a 30
b   20
a   0
就是a用户下单30块,b下单20块,a取消下单30块 , 最后a应该返回0 , 想要根据-D标识下发0值数据,下游直接覆盖。  
不知道说的清不清楚。
只通过flink-sql方式好像实现不了

> 2021年11月25日 09:45,Caizhi Weng <ts...@gmail.com> 写道:
> 
> Hi!
> 
> 无法在 SQL 里获得第一列的操作符。但可以通过 table.execute().collect() 获得产生的 CloseableIterator<Row>,然后通过 Row#getKind 获得该 row 对应的 op。
> 
> 顺便问一下,为什么需要在 SQL 里获得 op 呢?因为这个 op 应该只对内部算子以及 sink 有用,用户一般来说是不需要感知的。
> 
> wushijjian5 <wsjwoods555@163.com <ma...@163.com>> 于2021年11月24日周三 下午9:05写道:
> 
> 
> DataStream<Tuple4<String, String,Integer, Integer>> dataStream = env.fromElements(
>         new Tuple4<>("a", "a1",30,1),
>         new Tuple4<>("b", "b1",20,1),new Tuple4<>("a","a1",30,0),
>         new Tuple4<>("a","a2",30,1),
>         new Tuple4<>("a","a3",30,1));
> tEnv.createTemporaryView("tmpTable", dataStream, $("user"),$("ord"), $("num"), $("flag"));
> Table table = tEnv.sqlQuery(
>         " select user,sum(num) as num" +
>         " from (" +
>         "   select user,ord,LAST_VALUE(num) as num,LAST_VALUE(flag) as flag " +
>                 "from tmpTable " +
>                 "group by user,ord " +
>                 ") t1" +
>         " where flag=1 " +
>         " group by user" +
>         "");
> table.execute().print();
> 
> 这样一个回撤流,我能在sql里获取到第一列的操作符么,比如 select op,user,num from xxx   只通过sql的方式
> 


Re: FlinkSql回撤流

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

无法在 SQL 里获得第一列的操作符。但可以通过 table.execute().collect() 获得产生的
CloseableIterator<Row>,然后通过 Row#getKind 获得该 row 对应的 op。

顺便问一下,为什么需要在 SQL 里获得 op 呢?因为这个 op 应该只对内部算子以及 sink 有用,用户一般来说是不需要感知的。

wushijjian5 <ws...@163.com> 于2021年11月24日周三 下午9:05写道:

>
> DataStream<Tuple4<String, String,Integer, Integer>> dataStream = env.fromElements(
>         new Tuple4<>("a", "a1",30,1),
>         new Tuple4<>("b", "b1",20,1),new Tuple4<>("a","a1",30,0),
>         new Tuple4<>("a","a2",30,1),
>         new Tuple4<>("a","a3",30,1));
> tEnv.createTemporaryView("tmpTable", dataStream, $("user"),$("ord"), $("num"), $("flag"));
> Table table = tEnv.sqlQuery(
>         " select user,sum(num) as num" +
>         " from (" +
>         "   select user,ord,LAST_VALUE(num) as num,LAST_VALUE(flag) as flag " +
>                 "from tmpTable " +
>                 "group by user,ord " +
>                 ") t1" +
>         " where flag=1 " +
>         " group by user" +
>         "");
> table.execute().print();
>
>
> 这样一个回撤流,我能在sql里获取到第一列的操作符么,比如 select op,user,num from xxx   只通过sql的方式
>
>