You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Ren Xie <xi...@gmail.com> on 2020/01/14 09:37:57 UTC

求助帖: 流join场景可能出现的重复计算

学生
student_id name
11 foo

学科分数
id name score std_id
100 math 97 11
101 english 98 11

有如下一个场景(假设只有一个学生)

基于binlog检测这2个表的变化, 计算这个学生的总分数, 使用了Table/SQL API join操作计算

假设insert以上数据后到达某时刻, 以上数据都进入了flink, 计算出这个学生总分数 97 + 98 = 195

但此时发现学生姓名登记错误, 于是进行了修改,
结果此时Flink中学生流中有2个事件(insert的一个+update的一个), 分数流中有2个事件, 计算的总分数就会是 2 * (97 +
98) = 390

Q: 请问这种场景下使用什么能够解决, 计算出正确的结果 97 + 98 = 193

接触flink不久, 不是太了解, 请大佬给个提示, 谢谢!!

Re: 求助帖: 流join场景可能出现的重复计算

Posted by Ren Xie <xi...@gmail.com>.
谢谢!

我研究一下

JingsongLee <lz...@aliyun.com.invalid> 于2020年1月15日周三 上午11:57写道:

> Hi ren,
>
> Blink的deduplication功能应该是能match你的需求。[1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication
>
> Best,
> Jingsong Lee
>
>
> ------------------------------------------------------------------
> From:Caizhi Weng <ts...@gmail.com>
> Send Time:2020年1月15日(星期三) 11:53
> To:user-zh <us...@flink.apache.org>
> Subject:Re: 求助帖: 流join场景可能出现的重复计算
>
> Hi,
>
> Flink 目前认为所有的 source 都是 append only 的,retract、upsert 等都是内部处理时的概念,对用户是不可见的。
>
> 所以目前你只能先通过 group by 和 last_value 等方式实现功能。不过 1.11 有计划支持这样的需求。
>
> Ren Xie <xi...@gmail.com> 于2020年1月14日周二 下午9:30写道:
>
> > 谢谢
> >
> > 考虑过group by , 实际中 一个好多字段的表, 保不准就是那个字段发生了变化.
> >
> > 请问 类似的双流操作在开发中常见吗, 怎么都搜不到相似的使用, 按理谁流依赖另一个流做处理 应该算常见吧
> >
> > 还是说我这样的需求呀 实现呀 是野路子?
> >
> > Yuan,Youjun <yu...@baidu.com> 于2020年1月14日周二 下午8:22写道:
> >
> > > 取决于具体的场景。想到的有如下几种方案:
> > > 1,group by student_id和student_name,而不是只group by
> > > student_id。当然前提是修改同名名字不会推送一条消息到流1.
> > > 2,过滤掉update的消息
> > > 3,基于时间窗口的聚合,对于student表的数据,每n秒输出一个唯一的student_id,然后再与score流join。
> > >
> > > -----邮件原件-----
> > > 发件人: xin Destiny <nj...@gmail.com>
> > > 发送时间: Tuesday, January 14, 2020 6:39 PM
> > > 收件人: user-zh@flink.apache.org
> > > 主题: Re: 求助帖: 流join场景可能出现的重复计算
> > >
> > > Hi,
> > > 如果说插入两条update操作呢,一次分数是-97,一次是97
> > >
> > >
> > >
> > >
> > > Ren Xie <xi...@gmail.com> 于2020年1月14日周二 下午6:20写道:
> > >
> > > > 实际场景还是有点复杂的, 便于理解 我简化成这样的,  简化后的这个, 没有实际的代码, 抱歉
> > > >
> > > > 大致 写一下 也就是这样了
> > > > ```sql
> > > > select sum(score)
> > > > from
> > > >     student t1 inner join score t2 on t1.student_id = t2.std_id where
> > > >     t1.student_id = 11
> > > > ```
> > > > 然后
> > > >
> > > > ```Java
> > > > String sql = ↑;
> > > > Table t = tEnv.sqlQuery(sql);
> > > > DataStream<Integer> stream1 = tEnv.toAppendStream(t, Integer.class);
> > > > stream1.keyBy("xxxx").sum("xxxx");
> > > > ```
> > > >
> > > > 这样的一个sql, 在student表插入一个数据, score表插入2个数据后, 会执行一次计算出一个结果97 + 98
> > > >
> > > > update 学生表的name后, 一个新事件进入student的流, 还会触发一次计算, 得到97 + 98
> > > >
> > > > 因为可能有新的成绩插入, 所以对 stream1进行sum操作, 导致 97和98 都被重复计算了一次
> > > >
> > > >
> > > > Caizhi Weng <ts...@gmail.com> 于2020年1月14日周二 下午5:49写道:
> > > >
> > > > > Hi,
> > > > >
> > > > > 有可能的话,是否方便提供一下代码呢?
> > > > >
> > > > > Ren Xie <xi...@gmail.com> 于2020年1月14日周二 下午5:38写道:
> > > > >
> > > > > > 学生
> > > > > > student_id name
> > > > > > 11 foo
> > > > > >
> > > > > > 学科分数
> > > > > > id name score std_id
> > > > > > 100 math 97 11
> > > > > > 101 english 98 11
> > > > > >
> > > > > > 有如下一个场景(假设只有一个学生)
> > > > > >
> > > > > > 基于binlog检测这2个表的变化, 计算这个学生的总分数, 使用了Table/SQL API join操作计算
> > > > > >
> > > > > > 假设insert以上数据后到达某时刻, 以上数据都进入了flink, 计算出这个学生总分数 97 + 98 = 195
> > > > > >
> > > > > > 但此时发现学生姓名登记错误, 于是进行了修改,
> > > > > > 结果此时Flink中学生流中有2个事件(insert的一个+update的一个), 分数流中有2个事件, 计算的总分数就会是 2
> *
> > > > > > (97
> > > > +
> > > > > > 98) = 390
> > > > > >
> > > > > > Q: 请问这种场景下使用什么能够解决, 计算出正确的结果 97 + 98 = 193
> > > > > >
> > > > > > 接触flink不久, 不是太了解, 请大佬给个提示, 谢谢!!
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: 求助帖: 流join场景可能出现的重复计算

Posted by JingsongLee <lz...@aliyun.com.INVALID>.
Hi ren,

Blink的deduplication功能应该是能match你的需求。[1]

[1] https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/queries.html#deduplication

Best,
Jingsong Lee


------------------------------------------------------------------
From:Caizhi Weng <ts...@gmail.com>
Send Time:2020年1月15日(星期三) 11:53
To:user-zh <us...@flink.apache.org>
Subject:Re: 求助帖: 流join场景可能出现的重复计算

Hi,

Flink 目前认为所有的 source 都是 append only 的,retract、upsert 等都是内部处理时的概念,对用户是不可见的。

所以目前你只能先通过 group by 和 last_value 等方式实现功能。不过 1.11 有计划支持这样的需求。

Ren Xie <xi...@gmail.com> 于2020年1月14日周二 下午9:30写道:

> 谢谢
>
> 考虑过group by , 实际中 一个好多字段的表, 保不准就是那个字段发生了变化.
>
> 请问 类似的双流操作在开发中常见吗, 怎么都搜不到相似的使用, 按理谁流依赖另一个流做处理 应该算常见吧
>
> 还是说我这样的需求呀 实现呀 是野路子?
>
> Yuan,Youjun <yu...@baidu.com> 于2020年1月14日周二 下午8:22写道:
>
> > 取决于具体的场景。想到的有如下几种方案:
> > 1,group by student_id和student_name,而不是只group by
> > student_id。当然前提是修改同名名字不会推送一条消息到流1.
> > 2,过滤掉update的消息
> > 3,基于时间窗口的聚合,对于student表的数据,每n秒输出一个唯一的student_id,然后再与score流join。
> >
> > -----邮件原件-----
> > 发件人: xin Destiny <nj...@gmail.com>
> > 发送时间: Tuesday, January 14, 2020 6:39 PM
> > 收件人: user-zh@flink.apache.org
> > 主题: Re: 求助帖: 流join场景可能出现的重复计算
> >
> > Hi,
> > 如果说插入两条update操作呢,一次分数是-97,一次是97
> >
> >
> >
> >
> > Ren Xie <xi...@gmail.com> 于2020年1月14日周二 下午6:20写道:
> >
> > > 实际场景还是有点复杂的, 便于理解 我简化成这样的,  简化后的这个, 没有实际的代码, 抱歉
> > >
> > > 大致 写一下 也就是这样了
> > > ```sql
> > > select sum(score)
> > > from
> > >     student t1 inner join score t2 on t1.student_id = t2.std_id where
> > >     t1.student_id = 11
> > > ```
> > > 然后
> > >
> > > ```Java
> > > String sql = ↑;
> > > Table t = tEnv.sqlQuery(sql);
> > > DataStream<Integer> stream1 = tEnv.toAppendStream(t, Integer.class);
> > > stream1.keyBy("xxxx").sum("xxxx");
> > > ```
> > >
> > > 这样的一个sql, 在student表插入一个数据, score表插入2个数据后, 会执行一次计算出一个结果97 + 98
> > >
> > > update 学生表的name后, 一个新事件进入student的流, 还会触发一次计算, 得到97 + 98
> > >
> > > 因为可能有新的成绩插入, 所以对 stream1进行sum操作, 导致 97和98 都被重复计算了一次
> > >
> > >
> > > Caizhi Weng <ts...@gmail.com> 于2020年1月14日周二 下午5:49写道:
> > >
> > > > Hi,
> > > >
> > > > 有可能的话,是否方便提供一下代码呢?
> > > >
> > > > Ren Xie <xi...@gmail.com> 于2020年1月14日周二 下午5:38写道:
> > > >
> > > > > 学生
> > > > > student_id name
> > > > > 11 foo
> > > > >
> > > > > 学科分数
> > > > > id name score std_id
> > > > > 100 math 97 11
> > > > > 101 english 98 11
> > > > >
> > > > > 有如下一个场景(假设只有一个学生)
> > > > >
> > > > > 基于binlog检测这2个表的变化, 计算这个学生的总分数, 使用了Table/SQL API join操作计算
> > > > >
> > > > > 假设insert以上数据后到达某时刻, 以上数据都进入了flink, 计算出这个学生总分数 97 + 98 = 195
> > > > >
> > > > > 但此时发现学生姓名登记错误, 于是进行了修改,
> > > > > 结果此时Flink中学生流中有2个事件(insert的一个+update的一个), 分数流中有2个事件, 计算的总分数就会是 2 *
> > > > > (97
> > > +
> > > > > 98) = 390
> > > > >
> > > > > Q: 请问这种场景下使用什么能够解决, 计算出正确的结果 97 + 98 = 193
> > > > >
> > > > > 接触flink不久, 不是太了解, 请大佬给个提示, 谢谢!!
> > > > >
> > > >
> > >
> >
>

Re: 求助帖: 流join场景可能出现的重复计算

Posted by Ren Xie <xi...@gmail.com>.
谢谢!

我试一下

Caizhi Weng <ts...@gmail.com> 于2020年1月15日周三 上午11:53写道:

> Hi,
>
> Flink 目前认为所有的 source 都是 append only 的,retract、upsert 等都是内部处理时的概念,对用户是不可见的。
>
> 所以目前你只能先通过 group by 和 last_value 等方式实现功能。不过 1.11 有计划支持这样的需求。
>
> Ren Xie <xi...@gmail.com> 于2020年1月14日周二 下午9:30写道:
>
> > 谢谢
> >
> > 考虑过group by , 实际中 一个好多字段的表, 保不准就是那个字段发生了变化.
> >
> > 请问 类似的双流操作在开发中常见吗, 怎么都搜不到相似的使用, 按理谁流依赖另一个流做处理 应该算常见吧
> >
> > 还是说我这样的需求呀 实现呀 是野路子?
> >
> > Yuan,Youjun <yu...@baidu.com> 于2020年1月14日周二 下午8:22写道:
> >
> > > 取决于具体的场景。想到的有如下几种方案:
> > > 1,group by student_id和student_name,而不是只group by
> > > student_id。当然前提是修改同名名字不会推送一条消息到流1.
> > > 2,过滤掉update的消息
> > > 3,基于时间窗口的聚合,对于student表的数据,每n秒输出一个唯一的student_id,然后再与score流join。
> > >
> > > -----邮件原件-----
> > > 发件人: xin Destiny <nj...@gmail.com>
> > > 发送时间: Tuesday, January 14, 2020 6:39 PM
> > > 收件人: user-zh@flink.apache.org
> > > 主题: Re: 求助帖: 流join场景可能出现的重复计算
> > >
> > > Hi,
> > > 如果说插入两条update操作呢,一次分数是-97,一次是97
> > >
> > >
> > >
> > >
> > > Ren Xie <xi...@gmail.com> 于2020年1月14日周二 下午6:20写道:
> > >
> > > > 实际场景还是有点复杂的, 便于理解 我简化成这样的,  简化后的这个, 没有实际的代码, 抱歉
> > > >
> > > > 大致 写一下 也就是这样了
> > > > ```sql
> > > > select sum(score)
> > > > from
> > > >     student t1 inner join score t2 on t1.student_id = t2.std_id where
> > > >     t1.student_id = 11
> > > > ```
> > > > 然后
> > > >
> > > > ```Java
> > > > String sql = ↑;
> > > > Table t = tEnv.sqlQuery(sql);
> > > > DataStream<Integer> stream1 = tEnv.toAppendStream(t, Integer.class);
> > > > stream1.keyBy("xxxx").sum("xxxx");
> > > > ```
> > > >
> > > > 这样的一个sql, 在student表插入一个数据, score表插入2个数据后, 会执行一次计算出一个结果97 + 98
> > > >
> > > > update 学生表的name后, 一个新事件进入student的流, 还会触发一次计算, 得到97 + 98
> > > >
> > > > 因为可能有新的成绩插入, 所以对 stream1进行sum操作, 导致 97和98 都被重复计算了一次
> > > >
> > > >
> > > > Caizhi Weng <ts...@gmail.com> 于2020年1月14日周二 下午5:49写道:
> > > >
> > > > > Hi,
> > > > >
> > > > > 有可能的话,是否方便提供一下代码呢?
> > > > >
> > > > > Ren Xie <xi...@gmail.com> 于2020年1月14日周二 下午5:38写道:
> > > > >
> > > > > > 学生
> > > > > > student_id name
> > > > > > 11 foo
> > > > > >
> > > > > > 学科分数
> > > > > > id name score std_id
> > > > > > 100 math 97 11
> > > > > > 101 english 98 11
> > > > > >
> > > > > > 有如下一个场景(假设只有一个学生)
> > > > > >
> > > > > > 基于binlog检测这2个表的变化, 计算这个学生的总分数, 使用了Table/SQL API join操作计算
> > > > > >
> > > > > > 假设insert以上数据后到达某时刻, 以上数据都进入了flink, 计算出这个学生总分数 97 + 98 = 195
> > > > > >
> > > > > > 但此时发现学生姓名登记错误, 于是进行了修改,
> > > > > > 结果此时Flink中学生流中有2个事件(insert的一个+update的一个), 分数流中有2个事件, 计算的总分数就会是 2
> *
> > > > > > (97
> > > > +
> > > > > > 98) = 390
> > > > > >
> > > > > > Q: 请问这种场景下使用什么能够解决, 计算出正确的结果 97 + 98 = 193
> > > > > >
> > > > > > 接触flink不久, 不是太了解, 请大佬给个提示, 谢谢!!
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: 求助帖: 流join场景可能出现的重复计算

Posted by Caizhi Weng <ts...@gmail.com>.
Hi,

Flink 目前认为所有的 source 都是 append only 的,retract、upsert 等都是内部处理时的概念,对用户是不可见的。

所以目前你只能先通过 group by 和 last_value 等方式实现功能。不过 1.11 有计划支持这样的需求。

Ren Xie <xi...@gmail.com> 于2020年1月14日周二 下午9:30写道:

> 谢谢
>
> 考虑过group by , 实际中 一个好多字段的表, 保不准就是那个字段发生了变化.
>
> 请问 类似的双流操作在开发中常见吗, 怎么都搜不到相似的使用, 按理谁流依赖另一个流做处理 应该算常见吧
>
> 还是说我这样的需求呀 实现呀 是野路子?
>
> Yuan,Youjun <yu...@baidu.com> 于2020年1月14日周二 下午8:22写道:
>
> > 取决于具体的场景。想到的有如下几种方案:
> > 1,group by student_id和student_name,而不是只group by
> > student_id。当然前提是修改同名名字不会推送一条消息到流1.
> > 2,过滤掉update的消息
> > 3,基于时间窗口的聚合,对于student表的数据,每n秒输出一个唯一的student_id,然后再与score流join。
> >
> > -----邮件原件-----
> > 发件人: xin Destiny <nj...@gmail.com>
> > 发送时间: Tuesday, January 14, 2020 6:39 PM
> > 收件人: user-zh@flink.apache.org
> > 主题: Re: 求助帖: 流join场景可能出现的重复计算
> >
> > Hi,
> > 如果说插入两条update操作呢,一次分数是-97,一次是97
> >
> >
> >
> >
> > Ren Xie <xi...@gmail.com> 于2020年1月14日周二 下午6:20写道:
> >
> > > 实际场景还是有点复杂的, 便于理解 我简化成这样的,  简化后的这个, 没有实际的代码, 抱歉
> > >
> > > 大致 写一下 也就是这样了
> > > ```sql
> > > select sum(score)
> > > from
> > >     student t1 inner join score t2 on t1.student_id = t2.std_id where
> > >     t1.student_id = 11
> > > ```
> > > 然后
> > >
> > > ```Java
> > > String sql = ↑;
> > > Table t = tEnv.sqlQuery(sql);
> > > DataStream<Integer> stream1 = tEnv.toAppendStream(t, Integer.class);
> > > stream1.keyBy("xxxx").sum("xxxx");
> > > ```
> > >
> > > 这样的一个sql, 在student表插入一个数据, score表插入2个数据后, 会执行一次计算出一个结果97 + 98
> > >
> > > update 学生表的name后, 一个新事件进入student的流, 还会触发一次计算, 得到97 + 98
> > >
> > > 因为可能有新的成绩插入, 所以对 stream1进行sum操作, 导致 97和98 都被重复计算了一次
> > >
> > >
> > > Caizhi Weng <ts...@gmail.com> 于2020年1月14日周二 下午5:49写道:
> > >
> > > > Hi,
> > > >
> > > > 有可能的话,是否方便提供一下代码呢?
> > > >
> > > > Ren Xie <xi...@gmail.com> 于2020年1月14日周二 下午5:38写道:
> > > >
> > > > > 学生
> > > > > student_id name
> > > > > 11 foo
> > > > >
> > > > > 学科分数
> > > > > id name score std_id
> > > > > 100 math 97 11
> > > > > 101 english 98 11
> > > > >
> > > > > 有如下一个场景(假设只有一个学生)
> > > > >
> > > > > 基于binlog检测这2个表的变化, 计算这个学生的总分数, 使用了Table/SQL API join操作计算
> > > > >
> > > > > 假设insert以上数据后到达某时刻, 以上数据都进入了flink, 计算出这个学生总分数 97 + 98 = 195
> > > > >
> > > > > 但此时发现学生姓名登记错误, 于是进行了修改,
> > > > > 结果此时Flink中学生流中有2个事件(insert的一个+update的一个), 分数流中有2个事件, 计算的总分数就会是 2 *
> > > > > (97
> > > +
> > > > > 98) = 390
> > > > >
> > > > > Q: 请问这种场景下使用什么能够解决, 计算出正确的结果 97 + 98 = 193
> > > > >
> > > > > 接触flink不久, 不是太了解, 请大佬给个提示, 谢谢!!
> > > > >
> > > >
> > >
> >
>

Re: 求助帖: 流join场景可能出现的重复计算

Posted by Ren Xie <xi...@gmail.com>.
谢谢

考虑过group by , 实际中 一个好多字段的表, 保不准就是那个字段发生了变化.

请问 类似的双流操作在开发中常见吗, 怎么都搜不到相似的使用, 按理谁流依赖另一个流做处理 应该算常见吧

还是说我这样的需求呀 实现呀 是野路子?

Yuan,Youjun <yu...@baidu.com> 于2020年1月14日周二 下午8:22写道:

> 取决于具体的场景。想到的有如下几种方案:
> 1,group by student_id和student_name,而不是只group by
> student_id。当然前提是修改同名名字不会推送一条消息到流1.
> 2,过滤掉update的消息
> 3,基于时间窗口的聚合,对于student表的数据,每n秒输出一个唯一的student_id,然后再与score流join。
>
> -----邮件原件-----
> 发件人: xin Destiny <nj...@gmail.com>
> 发送时间: Tuesday, January 14, 2020 6:39 PM
> 收件人: user-zh@flink.apache.org
> 主题: Re: 求助帖: 流join场景可能出现的重复计算
>
> Hi,
> 如果说插入两条update操作呢,一次分数是-97,一次是97
>
>
>
>
> Ren Xie <xi...@gmail.com> 于2020年1月14日周二 下午6:20写道:
>
> > 实际场景还是有点复杂的, 便于理解 我简化成这样的,  简化后的这个, 没有实际的代码, 抱歉
> >
> > 大致 写一下 也就是这样了
> > ```sql
> > select sum(score)
> > from
> >     student t1 inner join score t2 on t1.student_id = t2.std_id where
> >     t1.student_id = 11
> > ```
> > 然后
> >
> > ```Java
> > String sql = ↑;
> > Table t = tEnv.sqlQuery(sql);
> > DataStream<Integer> stream1 = tEnv.toAppendStream(t, Integer.class);
> > stream1.keyBy("xxxx").sum("xxxx");
> > ```
> >
> > 这样的一个sql, 在student表插入一个数据, score表插入2个数据后, 会执行一次计算出一个结果97 + 98
> >
> > update 学生表的name后, 一个新事件进入student的流, 还会触发一次计算, 得到97 + 98
> >
> > 因为可能有新的成绩插入, 所以对 stream1进行sum操作, 导致 97和98 都被重复计算了一次
> >
> >
> > Caizhi Weng <ts...@gmail.com> 于2020年1月14日周二 下午5:49写道:
> >
> > > Hi,
> > >
> > > 有可能的话,是否方便提供一下代码呢?
> > >
> > > Ren Xie <xi...@gmail.com> 于2020年1月14日周二 下午5:38写道:
> > >
> > > > 学生
> > > > student_id name
> > > > 11 foo
> > > >
> > > > 学科分数
> > > > id name score std_id
> > > > 100 math 97 11
> > > > 101 english 98 11
> > > >
> > > > 有如下一个场景(假设只有一个学生)
> > > >
> > > > 基于binlog检测这2个表的变化, 计算这个学生的总分数, 使用了Table/SQL API join操作计算
> > > >
> > > > 假设insert以上数据后到达某时刻, 以上数据都进入了flink, 计算出这个学生总分数 97 + 98 = 195
> > > >
> > > > 但此时发现学生姓名登记错误, 于是进行了修改,
> > > > 结果此时Flink中学生流中有2个事件(insert的一个+update的一个), 分数流中有2个事件, 计算的总分数就会是 2 *
> > > > (97
> > +
> > > > 98) = 390
> > > >
> > > > Q: 请问这种场景下使用什么能够解决, 计算出正确的结果 97 + 98 = 193
> > > >
> > > > 接触flink不久, 不是太了解, 请大佬给个提示, 谢谢!!
> > > >
> > >
> >
>

回复: 求助帖: 流join场景可能出现的重复计算

Posted by "Yuan,Youjun" <yu...@baidu.com>.
取决于具体的场景。想到的有如下几种方案:
1,group by student_id和student_name,而不是只group by student_id。当然前提是修改同名名字不会推送一条消息到流1.
2,过滤掉update的消息
3,基于时间窗口的聚合,对于student表的数据,每n秒输出一个唯一的student_id,然后再与score流join。

-----邮件原件-----
发件人: xin Destiny <nj...@gmail.com> 
发送时间: Tuesday, January 14, 2020 6:39 PM
收件人: user-zh@flink.apache.org
主题: Re: 求助帖: 流join场景可能出现的重复计算

Hi,
如果说插入两条update操作呢,一次分数是-97,一次是97




Ren Xie <xi...@gmail.com> 于2020年1月14日周二 下午6:20写道:

> 实际场景还是有点复杂的, 便于理解 我简化成这样的,  简化后的这个, 没有实际的代码, 抱歉
>
> 大致 写一下 也就是这样了
> ```sql
> select sum(score)
> from
>     student t1 inner join score t2 on t1.student_id = t2.std_id where
>     t1.student_id = 11
> ```
> 然后
>
> ```Java
> String sql = ↑;
> Table t = tEnv.sqlQuery(sql);
> DataStream<Integer> stream1 = tEnv.toAppendStream(t, Integer.class); 
> stream1.keyBy("xxxx").sum("xxxx");
> ```
>
> 这样的一个sql, 在student表插入一个数据, score表插入2个数据后, 会执行一次计算出一个结果97 + 98
>
> update 学生表的name后, 一个新事件进入student的流, 还会触发一次计算, 得到97 + 98
>
> 因为可能有新的成绩插入, 所以对 stream1进行sum操作, 导致 97和98 都被重复计算了一次
>
>
> Caizhi Weng <ts...@gmail.com> 于2020年1月14日周二 下午5:49写道:
>
> > Hi,
> >
> > 有可能的话,是否方便提供一下代码呢?
> >
> > Ren Xie <xi...@gmail.com> 于2020年1月14日周二 下午5:38写道:
> >
> > > 学生
> > > student_id name
> > > 11 foo
> > >
> > > 学科分数
> > > id name score std_id
> > > 100 math 97 11
> > > 101 english 98 11
> > >
> > > 有如下一个场景(假设只有一个学生)
> > >
> > > 基于binlog检测这2个表的变化, 计算这个学生的总分数, 使用了Table/SQL API join操作计算
> > >
> > > 假设insert以上数据后到达某时刻, 以上数据都进入了flink, 计算出这个学生总分数 97 + 98 = 195
> > >
> > > 但此时发现学生姓名登记错误, 于是进行了修改,
> > > 结果此时Flink中学生流中有2个事件(insert的一个+update的一个), 分数流中有2个事件, 计算的总分数就会是 2 * 
> > > (97
> +
> > > 98) = 390
> > >
> > > Q: 请问这种场景下使用什么能够解决, 计算出正确的结果 97 + 98 = 193
> > >
> > > 接触flink不久, 不是太了解, 请大佬给个提示, 谢谢!!
> > >
> >
>

Re: 求助帖: 流join场景可能出现的重复计算

Posted by Ren Xie <xi...@gmail.com>.
谢谢解答!

稍微用代码写了一下, 如下:

public class JoinMain1 {

    private static final String host  = "127.0.0.1";
    private static final int    port  = 9000;
    private static final int    port1 = 9001;

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        SingleOutputStreamOperator<Student> student =
env.socketTextStream(host, port, "\n")
                .map(new MapFunction<String, Student>() {
                         @Override
                         public Student map(String s) throws Exception {
                             String[] tokens = s.toLowerCase().split(",");
                             Student ss = new Student();
                             ss.setId(Integer.valueOf(tokens[0]));
                             ss.setName(tokens[1]);
                             return ss;
                         }
                     }
                );

        SingleOutputStreamOperator<Score> score =
env.socketTextStream(host, port1, "\n")
                .map(new MapFunction<String, Score>() {
                         @Override
                         public Score map(String s) throws Exception {
                             String[] tokens = s.toLowerCase().split(",");
                             Score ss = new Score();
                             ss.setId(Integer.valueOf(tokens[0]));
                             ss.setName(tokens[1]);
                             ss.setSid(Integer.valueOf(tokens[2]));
                             ss.setScore(Integer.valueOf(tokens[3]));
                             return ss;
                         }
                     }
                );

        /// sql
        tEnv.registerDataStream("student", student, "id, name");
        tEnv.registerDataStream("score", score, "id, name, sid, score");
        Table t = tEnv.sqlQuery("select sum(t2.score) from student t1
inner join score t2 on t1.id = t2.sid");

        tEnv.toRetractStream(t, Integer.class).print("result");

        env.execute("table join");
    }
}

测试数据如下(红色数据为操作顺序):

[image: image.png]

执行结果如下:

[image: image.png]

1 2 3步骤计算出195(1个studen join 2个分数),
4步骤计算出390(原来195 + 新的student join 2个分数)
5步计算出196(390 + 新分数 join 2个student, 其实是 390 - 97 -97)
6步骤计算出390(196 + 新分数 join 2个student)

在第4步的时候, 分数就不太对了, 因为流中同一个student对应了2个事件

xin Destiny <nj...@gmail.com> 于2020年1月14日周二 下午6:39写道:

> Hi,
> 如果说插入两条update操作呢,一次分数是-97,一次是97
>
>
>
>
> Ren Xie <xi...@gmail.com> 于2020年1月14日周二 下午6:20写道:
>
> > 实际场景还是有点复杂的, 便于理解 我简化成这样的,  简化后的这个, 没有实际的代码, 抱歉
> >
> > 大致 写一下 也就是这样了
> > ```sql
> > select sum(score)
> > from
> >     student t1 inner join score t2 on t1.student_id = t2.std_id
> > where
> >     t1.student_id = 11
> > ```
> > 然后
> >
> > ```Java
> > String sql = ↑;
> > Table t = tEnv.sqlQuery(sql);
> > DataStream<Integer> stream1 = tEnv.toAppendStream(t, Integer.class);
> > stream1.keyBy("xxxx").sum("xxxx");
> > ```
> >
> > 这样的一个sql, 在student表插入一个数据, score表插入2个数据后, 会执行一次计算出一个结果97 + 98
> >
> > update 学生表的name后, 一个新事件进入student的流, 还会触发一次计算, 得到97 + 98
> >
> > 因为可能有新的成绩插入, 所以对 stream1进行sum操作, 导致 97和98 都被重复计算了一次
> >
> >
> > Caizhi Weng <ts...@gmail.com> 于2020年1月14日周二 下午5:49写道:
> >
> > > Hi,
> > >
> > > 有可能的话,是否方便提供一下代码呢?
> > >
> > > Ren Xie <xi...@gmail.com> 于2020年1月14日周二 下午5:38写道:
> > >
> > > > 学生
> > > > student_id name
> > > > 11 foo
> > > >
> > > > 学科分数
> > > > id name score std_id
> > > > 100 math 97 11
> > > > 101 english 98 11
> > > >
> > > > 有如下一个场景(假设只有一个学生)
> > > >
> > > > 基于binlog检测这2个表的变化, 计算这个学生的总分数, 使用了Table/SQL API join操作计算
> > > >
> > > > 假设insert以上数据后到达某时刻, 以上数据都进入了flink, 计算出这个学生总分数 97 + 98 = 195
> > > >
> > > > 但此时发现学生姓名登记错误, 于是进行了修改,
> > > > 结果此时Flink中学生流中有2个事件(insert的一个+update的一个), 分数流中有2个事件, 计算的总分数就会是 2 *
> (97
> > +
> > > > 98) = 390
> > > >
> > > > Q: 请问这种场景下使用什么能够解决, 计算出正确的结果 97 + 98 = 193
> > > >
> > > > 接触flink不久, 不是太了解, 请大佬给个提示, 谢谢!!
> > > >
> > >
> >
>

Re: 求助帖: 流join场景可能出现的重复计算

Posted by xin Destiny <nj...@gmail.com>.
Hi,
如果说插入两条update操作呢,一次分数是-97,一次是97




Ren Xie <xi...@gmail.com> 于2020年1月14日周二 下午6:20写道:

> 实际场景还是有点复杂的, 便于理解 我简化成这样的,  简化后的这个, 没有实际的代码, 抱歉
>
> 大致 写一下 也就是这样了
> ```sql
> select sum(score)
> from
>     student t1 inner join score t2 on t1.student_id = t2.std_id
> where
>     t1.student_id = 11
> ```
> 然后
>
> ```Java
> String sql = ↑;
> Table t = tEnv.sqlQuery(sql);
> DataStream<Integer> stream1 = tEnv.toAppendStream(t, Integer.class);
> stream1.keyBy("xxxx").sum("xxxx");
> ```
>
> 这样的一个sql, 在student表插入一个数据, score表插入2个数据后, 会执行一次计算出一个结果97 + 98
>
> update 学生表的name后, 一个新事件进入student的流, 还会触发一次计算, 得到97 + 98
>
> 因为可能有新的成绩插入, 所以对 stream1进行sum操作, 导致 97和98 都被重复计算了一次
>
>
> Caizhi Weng <ts...@gmail.com> 于2020年1月14日周二 下午5:49写道:
>
> > Hi,
> >
> > 有可能的话,是否方便提供一下代码呢?
> >
> > Ren Xie <xi...@gmail.com> 于2020年1月14日周二 下午5:38写道:
> >
> > > 学生
> > > student_id name
> > > 11 foo
> > >
> > > 学科分数
> > > id name score std_id
> > > 100 math 97 11
> > > 101 english 98 11
> > >
> > > 有如下一个场景(假设只有一个学生)
> > >
> > > 基于binlog检测这2个表的变化, 计算这个学生的总分数, 使用了Table/SQL API join操作计算
> > >
> > > 假设insert以上数据后到达某时刻, 以上数据都进入了flink, 计算出这个学生总分数 97 + 98 = 195
> > >
> > > 但此时发现学生姓名登记错误, 于是进行了修改,
> > > 结果此时Flink中学生流中有2个事件(insert的一个+update的一个), 分数流中有2个事件, 计算的总分数就会是 2 * (97
> +
> > > 98) = 390
> > >
> > > Q: 请问这种场景下使用什么能够解决, 计算出正确的结果 97 + 98 = 193
> > >
> > > 接触flink不久, 不是太了解, 请大佬给个提示, 谢谢!!
> > >
> >
>

Re: 求助帖: 流join场景可能出现的重复计算

Posted by Ren Xie <xi...@gmail.com>.
实际场景还是有点复杂的, 便于理解 我简化成这样的,  简化后的这个, 没有实际的代码, 抱歉

大致 写一下 也就是这样了
```sql
select sum(score)
from
    student t1 inner join score t2 on t1.student_id = t2.std_id
where
    t1.student_id = 11
```
然后

```Java
String sql = ↑;
Table t = tEnv.sqlQuery(sql);
DataStream<Integer> stream1 = tEnv.toAppendStream(t, Integer.class);
stream1.keyBy("xxxx").sum("xxxx");
```

这样的一个sql, 在student表插入一个数据, score表插入2个数据后, 会执行一次计算出一个结果97 + 98

update 学生表的name后, 一个新事件进入student的流, 还会触发一次计算, 得到97 + 98

因为可能有新的成绩插入, 所以对 stream1进行sum操作, 导致 97和98 都被重复计算了一次


Caizhi Weng <ts...@gmail.com> 于2020年1月14日周二 下午5:49写道:

> Hi,
>
> 有可能的话,是否方便提供一下代码呢?
>
> Ren Xie <xi...@gmail.com> 于2020年1月14日周二 下午5:38写道:
>
> > 学生
> > student_id name
> > 11 foo
> >
> > 学科分数
> > id name score std_id
> > 100 math 97 11
> > 101 english 98 11
> >
> > 有如下一个场景(假设只有一个学生)
> >
> > 基于binlog检测这2个表的变化, 计算这个学生的总分数, 使用了Table/SQL API join操作计算
> >
> > 假设insert以上数据后到达某时刻, 以上数据都进入了flink, 计算出这个学生总分数 97 + 98 = 195
> >
> > 但此时发现学生姓名登记错误, 于是进行了修改,
> > 结果此时Flink中学生流中有2个事件(insert的一个+update的一个), 分数流中有2个事件, 计算的总分数就会是 2 * (97 +
> > 98) = 390
> >
> > Q: 请问这种场景下使用什么能够解决, 计算出正确的结果 97 + 98 = 193
> >
> > 接触flink不久, 不是太了解, 请大佬给个提示, 谢谢!!
> >
>

Re: 求助帖: 流join场景可能出现的重复计算

Posted by Caizhi Weng <ts...@gmail.com>.
Hi,

有可能的话,是否方便提供一下代码呢?

Ren Xie <xi...@gmail.com> 于2020年1月14日周二 下午5:38写道:

> 学生
> student_id name
> 11 foo
>
> 学科分数
> id name score std_id
> 100 math 97 11
> 101 english 98 11
>
> 有如下一个场景(假设只有一个学生)
>
> 基于binlog检测这2个表的变化, 计算这个学生的总分数, 使用了Table/SQL API join操作计算
>
> 假设insert以上数据后到达某时刻, 以上数据都进入了flink, 计算出这个学生总分数 97 + 98 = 195
>
> 但此时发现学生姓名登记错误, 于是进行了修改,
> 结果此时Flink中学生流中有2个事件(insert的一个+update的一个), 分数流中有2个事件, 计算的总分数就会是 2 * (97 +
> 98) = 390
>
> Q: 请问这种场景下使用什么能够解决, 计算出正确的结果 97 + 98 = 193
>
> 接触flink不久, 不是太了解, 请大佬给个提示, 谢谢!!
>