You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by wushijjian5 <ws...@163.com> on 2021/11/24 13:04:48 UTC
FlinkSql回撤流
DataStream<Tuple4<String, String,Integer, Integer>> dataStream = env.fromElements(
new Tuple4<>("a", "a1",30,1),
new Tuple4<>("b", "b1",20,1),new Tuple4<>("a","a1",30,0),
new Tuple4<>("a","a2",30,1),
new Tuple4<>("a","a3",30,1));
tEnv.createTemporaryView("tmpTable", dataStream, $("user"),$("ord"), $("num"), $("flag"));
Table table = tEnv.sqlQuery(
" select user,sum(num) as num" +
" from (" +
" select user,ord,LAST_VALUE(num) as num,LAST_VALUE(flag) as flag " +
"from tmpTable " +
"group by user,ord " +
") t1" +
" where flag=1 " +
" group by user" +
"");
table.execute().print();
这样一个回撤流,我能在sql里获取到第一列的操作符么,比如 select op,user,num from xxx 只通过sql的方式
Re: FlinkSql回撤流
Posted by wushijjian5 <ws...@163.com>.
这个可以,非常感谢。
select user, sum(num * IF(flag=1, 1, 0)) as num
> from (
> select user, ord, LAST_VALUE(num) as num, LAST_VALUE(flag) as flag
> from tmpTable,
> group by user, ord
> ) t1
> group by user
-------
> 2021年11月25日 11:19,Tony Wei <to...@gmail.com> 写道:
>
> 上一封的 sql 稍微有誤,不需要 group by user, ord 才對:
>
> select user, sum(num) as num
>> from (
>> select user, ord, num * IF(flag=1, 1, -1) as num
>> from tmpTable
>> ) t1
>> group by user
>
>
> 或者也可以考慮這種寫法:
>
> select user, sum(num * IF(flag=1, 1, 0)) as num
>> from (
>> select user, ord, LAST_VALUE(num) as num, LAST_VALUE(flag) as flag
>> from tmpTable,
>> group by user, ord
>> ) t1
>> group by user
>
>
> best regards,
>
> Tony Wei <to...@gmail.com> 於 2021年11月25日 週四 上午11:01寫道:
>
>> Hi,
>>
>> 對於這個 sql 的回撤流,我感覺沒什麼問題。原因是如果在 batch 執行模式下,首三行的輸出結果應該為:
>>
>> +------+-------+
>> | user | num |
>> +------+-------+
>> | b | 20 |
>> +------+-------+
>>
>> 因為 user a 被 `where flag = 1` 過濾掉了,所以實際下游應該是撤回了 insert 的操作才對,而不是更新成 num = 0。
>>
>> 或許可以考慮把 sql 寫法改為這樣試試?
>>
>> select user, sum(num) as num
>>> from (
>>> select user, ord, num * IF(flag=1, 1, -1) as num
>>> from tmpTable,
>>> group by user, ord
>>> ) t1
>>> group by user
>>
>>
>> best regards,
>>
>> wushijjian5 <ws...@163.com> 於 2021年11月25日 週四 上午10:21寫道:
>>
>>> Hi,
>>> 这三条数据的话:
>>> new Tuple4<>("a", "a1",30,1),new Tuple4<>("b", "b1",20,1),new
>>> Tuple4<>("a","a1",30,0)
>>>
>>> 计算结果是:
>>> | +I | a | 30 |
>>> | +I | b | 20 |
>>> | -D | a | 30 |
>>>
>>> 实际想要的是
>>> a 30
>>> b 20
>>> a 0
>>> 就是a用户下单30块,b下单20块,a取消下单30块 , 最后a应该返回0 , 想要根据-D标识下发0值数据,下游直接覆盖。
>>> 不知道说的清不清楚。
>>> 只通过flink-sql方式好像实现不了
>>>
>>>> 2021年11月25日 09:45,Caizhi Weng <ts...@gmail.com> 写道:
>>>>
>>>> Hi!
>>>>
>>>> 无法在 SQL 里获得第一列的操作符。但可以通过 table.execute().collect() 获得产生的
>>> CloseableIterator<Row>,然后通过 Row#getKind 获得该 row 对应的 op。
>>>>
>>>> 顺便问一下,为什么需要在 SQL 里获得 op 呢?因为这个 op 应该只对内部算子以及 sink 有用,用户一般来说是不需要感知的。
>>>>
>>>> wushijjian5 <wsjwoods555@163.com <ma...@163.com>>
>>> 于2021年11月24日周三 下午9:05写道:
>>>>
>>>>
>>>> DataStream<Tuple4<String, String,Integer, Integer>> dataStream =
>>> env.fromElements(
>>>> new Tuple4<>("a", "a1",30,1),
>>>> new Tuple4<>("b", "b1",20,1),new Tuple4<>("a","a1",30,0),
>>>> new Tuple4<>("a","a2",30,1),
>>>> new Tuple4<>("a","a3",30,1));
>>>> tEnv.createTemporaryView("tmpTable", dataStream, $("user"),$("ord"),
>>> $("num"), $("flag"));
>>>> Table table = tEnv.sqlQuery(
>>>> " select user,sum(num) as num" +
>>>> " from (" +
>>>> " select user,ord,LAST_VALUE(num) as num,LAST_VALUE(flag) as
>>> flag " +
>>>> "from tmpTable " +
>>>> "group by user,ord " +
>>>> ") t1" +
>>>> " where flag=1 " +
>>>> " group by user" +
>>>> "");
>>>> table.execute().print();
>>>>
>>>> 这样一个回撤流,我能在sql里获取到第一列的操作符么,比如 select op,user,num from xxx 只通过sql的方式
>>>>
>>>
>>>
Re: FlinkSql回撤流
Posted by Tony Wei <to...@gmail.com>.
上一封的 sql 稍微有誤,不需要 group by user, ord 才對:
select user, sum(num) as num
> from (
> select user, ord, num * IF(flag=1, 1, -1) as num
> from tmpTable
> ) t1
> group by user
或者也可以考慮這種寫法:
select user, sum(num * IF(flag=1, 1, 0)) as num
> from (
> select user, ord, LAST_VALUE(num) as num, LAST_VALUE(flag) as flag
> from tmpTable,
> group by user, ord
> ) t1
> group by user
best regards,
Tony Wei <to...@gmail.com> 於 2021年11月25日 週四 上午11:01寫道:
> Hi,
>
> 對於這個 sql 的回撤流,我感覺沒什麼問題。原因是如果在 batch 執行模式下,首三行的輸出結果應該為:
>
> +------+-------+
> | user | num |
> +------+-------+
> | b | 20 |
> +------+-------+
>
> 因為 user a 被 `where flag = 1` 過濾掉了,所以實際下游應該是撤回了 insert 的操作才對,而不是更新成 num = 0。
>
> 或許可以考慮把 sql 寫法改為這樣試試?
>
> select user, sum(num) as num
>> from (
>> select user, ord, num * IF(flag=1, 1, -1) as num
>> from tmpTable,
>> group by user, ord
>> ) t1
>> group by user
>
>
> best regards,
>
> wushijjian5 <ws...@163.com> 於 2021年11月25日 週四 上午10:21寫道:
>
>> Hi,
>> 这三条数据的话:
>> new Tuple4<>("a", "a1",30,1),new Tuple4<>("b", "b1",20,1),new
>> Tuple4<>("a","a1",30,0)
>>
>> 计算结果是:
>> | +I | a | 30 |
>> | +I | b | 20 |
>> | -D | a | 30 |
>>
>> 实际想要的是
>> a 30
>> b 20
>> a 0
>> 就是a用户下单30块,b下单20块,a取消下单30块 , 最后a应该返回0 , 想要根据-D标识下发0值数据,下游直接覆盖。
>> 不知道说的清不清楚。
>> 只通过flink-sql方式好像实现不了
>>
>> > 2021年11月25日 09:45,Caizhi Weng <ts...@gmail.com> 写道:
>> >
>> > Hi!
>> >
>> > 无法在 SQL 里获得第一列的操作符。但可以通过 table.execute().collect() 获得产生的
>> CloseableIterator<Row>,然后通过 Row#getKind 获得该 row 对应的 op。
>> >
>> > 顺便问一下,为什么需要在 SQL 里获得 op 呢?因为这个 op 应该只对内部算子以及 sink 有用,用户一般来说是不需要感知的。
>> >
>> > wushijjian5 <wsjwoods555@163.com <ma...@163.com>>
>> 于2021年11月24日周三 下午9:05写道:
>> >
>> >
>> > DataStream<Tuple4<String, String,Integer, Integer>> dataStream =
>> env.fromElements(
>> > new Tuple4<>("a", "a1",30,1),
>> > new Tuple4<>("b", "b1",20,1),new Tuple4<>("a","a1",30,0),
>> > new Tuple4<>("a","a2",30,1),
>> > new Tuple4<>("a","a3",30,1));
>> > tEnv.createTemporaryView("tmpTable", dataStream, $("user"),$("ord"),
>> $("num"), $("flag"));
>> > Table table = tEnv.sqlQuery(
>> > " select user,sum(num) as num" +
>> > " from (" +
>> > " select user,ord,LAST_VALUE(num) as num,LAST_VALUE(flag) as
>> flag " +
>> > "from tmpTable " +
>> > "group by user,ord " +
>> > ") t1" +
>> > " where flag=1 " +
>> > " group by user" +
>> > "");
>> > table.execute().print();
>> >
>> > 这样一个回撤流,我能在sql里获取到第一列的操作符么,比如 select op,user,num from xxx 只通过sql的方式
>> >
>>
>>
Re: FlinkSql回撤流
Posted by Tony Wei <to...@gmail.com>.
Hi,
對於這個 sql 的回撤流,我感覺沒什麼問題。原因是如果在 batch 執行模式下,首三行的輸出結果應該為:
+------+-------+
| user | num |
+------+-------+
| b | 20 |
+------+-------+
因為 user a 被 `where flag = 1` 過濾掉了,所以實際下游應該是撤回了 insert 的操作才對,而不是更新成 num = 0。
或許可以考慮把 sql 寫法改為這樣試試?
select user, sum(num) as num
> from (
> select user, ord, num * IF(flag=1, 1, -1) as num
> from tmpTable,
> group by user, ord
> ) t1
> group by user
best regards,
wushijjian5 <ws...@163.com> 於 2021年11月25日 週四 上午10:21寫道:
> Hi,
> 这三条数据的话:
> new Tuple4<>("a", "a1",30,1),new Tuple4<>("b", "b1",20,1),new
> Tuple4<>("a","a1",30,0)
>
> 计算结果是:
> | +I | a | 30 |
> | +I | b | 20 |
> | -D | a | 30 |
>
> 实际想要的是
> a 30
> b 20
> a 0
> 就是a用户下单30块,b下单20块,a取消下单30块 , 最后a应该返回0 , 想要根据-D标识下发0值数据,下游直接覆盖。
> 不知道说的清不清楚。
> 只通过flink-sql方式好像实现不了
>
> > 2021年11月25日 09:45,Caizhi Weng <ts...@gmail.com> 写道:
> >
> > Hi!
> >
> > 无法在 SQL 里获得第一列的操作符。但可以通过 table.execute().collect() 获得产生的
> CloseableIterator<Row>,然后通过 Row#getKind 获得该 row 对应的 op。
> >
> > 顺便问一下,为什么需要在 SQL 里获得 op 呢?因为这个 op 应该只对内部算子以及 sink 有用,用户一般来说是不需要感知的。
> >
> > wushijjian5 <wsjwoods555@163.com <ma...@163.com>>
> 于2021年11月24日周三 下午9:05写道:
> >
> >
> > DataStream<Tuple4<String, String,Integer, Integer>> dataStream =
> env.fromElements(
> > new Tuple4<>("a", "a1",30,1),
> > new Tuple4<>("b", "b1",20,1),new Tuple4<>("a","a1",30,0),
> > new Tuple4<>("a","a2",30,1),
> > new Tuple4<>("a","a3",30,1));
> > tEnv.createTemporaryView("tmpTable", dataStream, $("user"),$("ord"),
> $("num"), $("flag"));
> > Table table = tEnv.sqlQuery(
> > " select user,sum(num) as num" +
> > " from (" +
> > " select user,ord,LAST_VALUE(num) as num,LAST_VALUE(flag) as
> flag " +
> > "from tmpTable " +
> > "group by user,ord " +
> > ") t1" +
> > " where flag=1 " +
> > " group by user" +
> > "");
> > table.execute().print();
> >
> > 这样一个回撤流,我能在sql里获取到第一列的操作符么,比如 select op,user,num from xxx 只通过sql的方式
> >
>
>
Re: FlinkSql回撤流
Posted by wushijjian5 <ws...@163.com>.
Hi,
这三条数据的话:
new Tuple4<>("a", "a1",30,1),new Tuple4<>("b", "b1",20,1),new Tuple4<>("a","a1",30,0)
计算结果是:
| +I | a | 30 |
| +I | b | 20 |
| -D | a | 30 |
实际想要的是
a 30
b 20
a 0
就是a用户下单30块,b下单20块,a取消下单30块 , 最后a应该返回0 , 想要根据-D标识下发0值数据,下游直接覆盖。
不知道说的清不清楚。
只通过flink-sql方式好像实现不了
> 2021年11月25日 09:45,Caizhi Weng <ts...@gmail.com> 写道:
>
> Hi!
>
> 无法在 SQL 里获得第一列的操作符。但可以通过 table.execute().collect() 获得产生的 CloseableIterator<Row>,然后通过 Row#getKind 获得该 row 对应的 op。
>
> 顺便问一下,为什么需要在 SQL 里获得 op 呢?因为这个 op 应该只对内部算子以及 sink 有用,用户一般来说是不需要感知的。
>
> wushijjian5 <wsjwoods555@163.com <ma...@163.com>> 于2021年11月24日周三 下午9:05写道:
>
>
> DataStream<Tuple4<String, String,Integer, Integer>> dataStream = env.fromElements(
> new Tuple4<>("a", "a1",30,1),
> new Tuple4<>("b", "b1",20,1),new Tuple4<>("a","a1",30,0),
> new Tuple4<>("a","a2",30,1),
> new Tuple4<>("a","a3",30,1));
> tEnv.createTemporaryView("tmpTable", dataStream, $("user"),$("ord"), $("num"), $("flag"));
> Table table = tEnv.sqlQuery(
> " select user,sum(num) as num" +
> " from (" +
> " select user,ord,LAST_VALUE(num) as num,LAST_VALUE(flag) as flag " +
> "from tmpTable " +
> "group by user,ord " +
> ") t1" +
> " where flag=1 " +
> " group by user" +
> "");
> table.execute().print();
>
> 这样一个回撤流,我能在sql里获取到第一列的操作符么,比如 select op,user,num from xxx 只通过sql的方式
>
Re: FlinkSql回撤流
Posted by Caizhi Weng <ts...@gmail.com>.
Hi!
无法在 SQL 里获得第一列的操作符。但可以通过 table.execute().collect() 获得产生的
CloseableIterator<Row>,然后通过 Row#getKind 获得该 row 对应的 op。
顺便问一下,为什么需要在 SQL 里获得 op 呢?因为这个 op 应该只对内部算子以及 sink 有用,用户一般来说是不需要感知的。
wushijjian5 <ws...@163.com> 于2021年11月24日周三 下午9:05写道:
>
> DataStream<Tuple4<String, String,Integer, Integer>> dataStream = env.fromElements(
> new Tuple4<>("a", "a1",30,1),
> new Tuple4<>("b", "b1",20,1),new Tuple4<>("a","a1",30,0),
> new Tuple4<>("a","a2",30,1),
> new Tuple4<>("a","a3",30,1));
> tEnv.createTemporaryView("tmpTable", dataStream, $("user"),$("ord"), $("num"), $("flag"));
> Table table = tEnv.sqlQuery(
> " select user,sum(num) as num" +
> " from (" +
> " select user,ord,LAST_VALUE(num) as num,LAST_VALUE(flag) as flag " +
> "from tmpTable " +
> "group by user,ord " +
> ") t1" +
> " where flag=1 " +
> " group by user" +
> "");
> table.execute().print();
>
>
> 这样一个回撤流,我能在sql里获取到第一列的操作符么,比如 select op,user,num from xxx 只通过sql的方式
>
>