You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 小屁孩 <93...@qq.com> on 2020/06/04 06:15:05 UTC
关于flinksql 与维表mysql的关联问题
dear: 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
回复:关于flinksql 与维表mysql的关联问题
Posted by 1530130567 <15...@qq.com>.
Hi:
我的理解是想达到延迟JOIN的目的吗,可以考虑利用WaterMark的maxoutoforderness
------------------ 原始邮件 ------------------
发件人: "小屁孩"<932460849@qq.com>;
发送时间: 2020年6月4日(星期四) 下午2:15
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: 关于flinksql 与维表mysql的关联问题
dear:&nbsp; &nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
回复:关于flinksql 与维表mysql的关联问题
Posted by 1048262223 <10...@qq.com>.
Hi
你是会有job重启操作吗,job取消时做savepoint重启时应该不会有这个问题?
Best,
Yichao Yang
------------------ 原始邮件 ------------------
发件人: "小屁孩"<932460849@qq.com>;
发送时间: 2020年6月4日(星期四) 下午2:15
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: 关于flinksql 与维表mysql的关联问题
dear:&nbsp; &nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
回复:关于flinksql 与维表mysql的关联问题
Posted by 1048262223 <10...@qq.com>.
Hi
是的。
Best,
Yichao Yang
发自我的iPhone
------------------ 原始邮件 ------------------
发件人: Px New <15701181132mr.liu@gmail.com>
发送时间: 2020年6月7日 19:03
收件人: user-zh <user-zh@flink.apache.org>
主题: 回复:关于flinksql 与维表mysql的关联问题
好的 我可以理解为是: 通过env.addsouce创建一个广播流。下游connect后 在process方法中操作?
1048262223 <1048262223@qq.com>于2020年6月7日 周日下午3:57写道:
> Hi
>
>
> 可以使用open + broadcast的方式解决~
>
>
> Best,
> Yichao Yang
>
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Px New"<15701181132mr.liu@gmail.com&gt;;
> 发送时间:&nbsp;2020年6月6日(星期六) 上午9:50
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;Re: 关于flinksql 与维表mysql的关联问题
>
>
>
> Hi ,我有一个相关操作的一疑问.
> 疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?
>
> Michael Ran <greemqqran@163.com&gt; 于2020年6月4日周四 下午5:22写道:
>
> &gt; 放到open 方法里面可以吗?
> &gt; 在 2020-06-04 14:15:05,"小屁孩" <932460849@qq.com&gt; 写道:
> &gt; &gt;dear:&amp;nbsp; &amp;nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联
> 关于mysql更新的问题
> &gt;
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
> &gt;
Re: 关于flinksql 与维表mysql的关联问题
Posted by LakeShen <sh...@gmail.com>.
Hi ,
延迟维表关联这个特性我觉得还是一个比较通用的特性,目前我们考虑借助 Timer 来实现的,社区如果有这个功能的话,我觉得对于 Flink
使用方会有很大帮助的。
我看社区有这样的一个 JIRA 再跟踪了[1],我会持续关注。
[1] https://issues.apache.org/jira/browse/FLINK-19063
Best,
LakeShen
小屁孩 <93...@qq.com> 于2020年6月8日周一 上午9:28写道:
> hi,目前我就是这样做的 数据在启动时会有数据先后到来的问题
>
>
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "Px New"<15701181132mr.liu@gmail.com>;
> 发送时间: 2020年6月7日(星期天) 晚上7:02
> 收件人: "user-zh"<user-zh@flink.apache.org>;
>
> 主题: Re: 关于flinksql 与维表mysql的关联问题
>
>
>
> 好的 我可以理解为是: 通过env.addsouce创建一个广播流。下游connect后 在process方法中操作?
>
> 1048262223 <1048262223@qq.com>于2020年6月7日 周日下午3:57写道:
>
> > Hi
> >
> >
> > 可以使用open + broadcast的方式解决~
> >
> >
> > Best,
> > Yichao Yang
> >
> >
> >
> >
> >
> > ------------------&nbsp;原始邮件&nbsp;------------------
> > 发件人:&nbsp;"Px New"<15701181132mr.liu@gmail.com&gt;;
> > 发送时间:&nbsp;2020年6月6日(星期六) 上午9:50
> > 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
> >
> > 主题:&nbsp;Re: 关于flinksql 与维表mysql的关联问题
> >
> >
> >
> > Hi ,我有一个相关操作的一疑问.
> > 疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?
> >
> > Michael Ran <greemqqran@163.com&gt; 于2020年6月4日周四 下午5:22写道:
> >
> > &gt; 放到open 方法里面可以吗?
> > &gt; 在 2020-06-04 14:15:05,"小屁孩" <932460849@qq.com&gt; 写道:
> > &gt; &gt;dear:&amp;nbsp; &amp;nbsp;
> 我有个问题想请教下,关于flinksql与mysql维表关联
> > 关于mysql更新的问题
> > &gt;
> >
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
> > &gt;
回复: 关于flinksql 与维表mysql的关联问题
Posted by 小屁孩 <93...@qq.com>.
hi,目前我就是这样做的 数据在启动时会有数据先后到来的问题
------------------ 原始邮件 ------------------
发件人: "Px New"<15701181132mr.liu@gmail.com>;
发送时间: 2020年6月7日(星期天) 晚上7:02
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: Re: 关于flinksql 与维表mysql的关联问题
好的 我可以理解为是: 通过env.addsouce创建一个广播流。下游connect后 在process方法中操作?
1048262223 <1048262223@qq.com>于2020年6月7日 周日下午3:57写道:
> Hi
>
>
> 可以使用open + broadcast的方式解决~
>
>
> Best,
> Yichao Yang
>
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Px New"<15701181132mr.liu@gmail.com&gt;;
> 发送时间:&nbsp;2020年6月6日(星期六) 上午9:50
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;Re: 关于flinksql 与维表mysql的关联问题
>
>
>
> Hi ,我有一个相关操作的一疑问.
> 疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?
>
> Michael Ran <greemqqran@163.com&gt; 于2020年6月4日周四 下午5:22写道:
>
> &gt; 放到open 方法里面可以吗?
> &gt; 在 2020-06-04 14:15:05,"小屁孩" <932460849@qq.com&gt; 写道:
> &gt; &gt;dear:&amp;nbsp; &amp;nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联
> 关于mysql更新的问题
> &gt;
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
> &gt;
Re: 关于flinksql 与维表mysql的关联问题
Posted by Px New <15...@gmail.com>.
好的 我可以理解为是: 通过env.addsouce创建一个广播流。下游connect后 在process方法中操作?
1048262223 <10...@qq.com>于2020年6月7日 周日下午3:57写道:
> Hi
>
>
> 可以使用open + broadcast的方式解决~
>
>
> Best,
> Yichao Yang
>
>
>
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "Px New"<15701181132mr.liu@gmail.com>;
> 发送时间: 2020年6月6日(星期六) 上午9:50
> 收件人: "user-zh"<user-zh@flink.apache.org>;
>
> 主题: Re: 关于flinksql 与维表mysql的关联问题
>
>
>
> Hi ,我有一个相关操作的一疑问.
> 疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?
>
> Michael Ran <greemqqran@163.com> 于2020年6月4日周四 下午5:22写道:
>
> > 放到open 方法里面可以吗?
> > 在 2020-06-04 14:15:05,"小屁孩" <932460849@qq.com> 写道:
> > >dear:&nbsp; &nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联
> 关于mysql更新的问题
> >
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
> >
回复: 关于flinksql 与维表mysql的关联问题
Posted by 1048262223 <10...@qq.com>.
Hi
可以使用open + broadcast的方式解决~
Best,
Yichao Yang
------------------ 原始邮件 ------------------
发件人: "Px New"<15701181132mr.liu@gmail.com>;
发送时间: 2020年6月6日(星期六) 上午9:50
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: Re: 关于flinksql 与维表mysql的关联问题
Hi ,我有一个相关操作的一疑问.
疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?
Michael Ran <greemqqran@163.com> 于2020年6月4日周四 下午5:22写道:
> 放到open 方法里面可以吗?
> 在 2020-06-04 14:15:05,"小屁孩" <932460849@qq.com> 写道:
> >dear:&nbsp; &nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
>
Re: 关于flinksql 与维表mysql的关联问题
Posted by Px New <15...@gmail.com>.
Hi ,我有一个相关操作的一疑问.
疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?
Michael Ran <gr...@163.com> 于2020年6月4日周四 下午5:22写道:
> 放到open 方法里面可以吗?
> 在 2020-06-04 14:15:05,"小屁孩" <93...@qq.com> 写道:
> >dear: 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
>
Re: 关于flinksql 与维表mysql的关联问题
Posted by godfrey he <go...@gmail.com>.
hi 可以考虑使用 temporal table join [1]
Best,
Godfrey
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#join-with-a-temporal-table
小屁孩 <93...@qq.com> 于2020年6月4日周四 下午5:51写道:
> 您的意思是open 全量预加载吗?我目前的逻辑是自己自定义的source 广播出去
> 这是我的source
>
>
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
>
>
> import java.sql.Connection;
> import java.sql.PreparedStatement;
> import java.sql.ResultSet;
> import java.util.HashMap;
> import java.util.Map;
>
>
> public class GetMysqlDvcId extends RichSourceFunction<Map<String,
> Integer>> {
>
>
> private Connection connection = null;
> private PreparedStatement ps = null;
> private volatile boolean isRunning = true;
>
>
>
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> String database="db_nssa";
> String host="212.21.12.12";
> String password="saa!";
> String port="3306";
> String username="root";
>
>
>
>
> String driver = "com.mysql.jdbc.Driver";
> String url = "jdbc:mysql://" + host + ":" +
> port + "/" + database + "?useUnicode=true&characterEncoding=UTF-8";
> connection =
> MySQLUtil.getConnection(driver, url, username, password);
>
>
>
>
> if (this.connection != null) {
> String sql = "select
> ip,device_id from sys_device";
> ps
> =connection.prepareStatement(sql);
> }
> }
>
>
> @Override
> public void run(SourceContext<Map<String, Integer>>
> ctx) throws Exception {
> Map<String, Integer> map = new
> HashMap<>();
> while (isRunning) {
> ResultSet resultSet =
> ps.executeQuery();
> while (resultSet.next()) {
>
> map.put(resultSet.getString("ip"),resultSet.getInt("device_id"));
> }
>
> System.out.println("=======select alarm notify from mysql, size = {}, map =
> {}"+ map.size()+ map);
> ctx.collect(map);
> map.clear();
> Thread.sleep(2000 * 60);
> }
>
>
> }
>
>
>
>
> @Override
> public void cancel() {
> try {
> super.close();
> if (connection != null) {
> connection.close();
> }
> if (ps != null) {
> ps.close();
> }
> } catch (Exception e) {
>
> System.out.println("runException:{}"+e);
> }
> isRunning = false;
> }
> }
>
> ------------------ 原始邮件 ------------------
> 发件人: "Michael Ran"<greemqqran@163.com>;
> 发送时间: 2020年6月4日(星期四) 下午5:22
> 收件人: "user-zh"<user-zh@flink.apache.org>;
>
> 主题: Re:关于flinksql 与维表mysql的关联问题
>
>
>
> 放到open 方法里面可以吗?
> 在 2020-06-04 14:15:05,"小屁孩" <932460849@qq.com> 写道:
> >dear:&nbsp; &nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
回复:关于flinksql 与维表mysql的关联问题
Posted by 小屁孩 <93...@qq.com>.
您的意思是open 全量预加载吗?我目前的逻辑是自己自定义的source 广播出去
这是我的source
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
public class GetMysqlDvcId extends RichSourceFunction<Map<String, Integer>> {
private Connection connection = null;
private PreparedStatement ps = null;
private volatile boolean isRunning = true;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
String database="db_nssa";
String host="212.21.12.12";
String password="saa!";
String port="3306";
String username="root";
String driver = "com.mysql.jdbc.Driver";
String url = "jdbc:mysql://" + host + ":" + port + "/" + database + "?useUnicode=true&characterEncoding=UTF-8";
connection = MySQLUtil.getConnection(driver, url, username, password);
if (this.connection != null) {
String sql = "select ip,device_id from sys_device";
ps =connection.prepareStatement(sql);
}
}
@Override
public void run(SourceContext<Map<String, Integer>> ctx) throws Exception {
Map<String, Integer> map = new HashMap<>();
while (isRunning) {
ResultSet resultSet = ps.executeQuery();
while (resultSet.next()) {
map.put(resultSet.getString("ip"),resultSet.getInt("device_id"));
}
System.out.println("=======select alarm notify from mysql, size = {}, map = {}"+ map.size()+ map);
ctx.collect(map);
map.clear();
Thread.sleep(2000 * 60);
}
}
@Override
public void cancel() {
try {
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
} catch (Exception e) {
System.out.println("runException:{}"+e);
}
isRunning = false;
}
}
------------------ 原始邮件 ------------------
发件人: "Michael Ran"<greemqqran@163.com>;
发送时间: 2020年6月4日(星期四) 下午5:22
收件人: "user-zh"<user-zh@flink.apache.org>;
主题: Re:关于flinksql 与维表mysql的关联问题
放到open 方法里面可以吗?
在 2020-06-04 14:15:05,"小屁孩" <932460849@qq.com> 写道:
>dear:&nbsp; &nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
Re:关于flinksql 与维表mysql的关联问题
Posted by Michael Ran <gr...@163.com>.
放到open 方法里面可以吗?
在 2020-06-04 14:15:05,"小屁孩" <93...@qq.com> 写道:
>dear: 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据