You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 小屁孩 <93...@qq.com> on 2020/06/04 06:15:05 UTC

关于flinksql 与维表mysql的关联问题

dear:&nbsp; &nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据

回复:关于flinksql 与维表mysql的关联问题

Posted by 1530130567 <15...@qq.com>.
Hi:
我的理解是想达到延迟JOIN的目的吗,可以考虑利用WaterMark的maxoutoforderness






------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"小屁孩"<932460849@qq.com&gt;;
发送时间:&nbsp;2020年6月4日(星期四) 下午2:15
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;关于flinksql 与维表mysql的关联问题



dear:&amp;nbsp; &amp;nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据

回复:关于flinksql 与维表mysql的关联问题

Posted by 1048262223 <10...@qq.com>.
Hi


你是会有job重启操作吗,job取消时做savepoint重启时应该不会有这个问题?


Best,
Yichao Yang




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"小屁孩"<932460849@qq.com&gt;;
发送时间:&nbsp;2020年6月4日(星期四) 下午2:15
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;关于flinksql 与维表mysql的关联问题



dear:&amp;nbsp; &amp;nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据

回复:关于flinksql 与维表mysql的关联问题

Posted by 1048262223 <10...@qq.com>.
Hi


是的。


Best,
Yichao Yang



发自我的iPhone


------------------ 原始邮件 ------------------
发件人: Px New <15701181132mr.liu@gmail.com&gt;
发送时间: 2020年6月7日 19:03
收件人: user-zh <user-zh@flink.apache.org&gt;
主题: 回复:关于flinksql 与维表mysql的关联问题



好的 我可以理解为是: 通过env.addsouce创建一个广播流。下游connect后 在process方法中操作?

1048262223 <1048262223@qq.com&gt;于2020年6月7日 周日下午3:57写道:

&gt; Hi
&gt;
&gt;
&gt; 可以使用open + broadcast的方式解决~
&gt;
&gt;
&gt; Best,
&gt; Yichao Yang
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&amp;nbsp;"Px New"<15701181132mr.liu@gmail.com&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年6月6日(星期六) 上午9:50
&gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;Re: 关于flinksql 与维表mysql的关联问题
&gt;
&gt;
&gt;
&gt; Hi ,我有一个相关操作的一疑问.
&gt; 疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?
&gt;
&gt; Michael Ran <greemqqran@163.com&amp;gt; 于2020年6月4日周四 下午5:22写道:
&gt;
&gt; &amp;gt; 放到open 方法里面可以吗?
&gt; &amp;gt; 在 2020-06-04 14:15:05,"小屁孩" <932460849@qq.com&amp;gt; 写道:
&gt; &amp;gt; &amp;gt;dear:&amp;amp;nbsp; &amp;amp;nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联
&gt; 关于mysql更新的问题
&gt; &amp;gt;
&gt; 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
&gt; &amp;gt;

Re: 关于flinksql 与维表mysql的关联问题

Posted by LakeShen <sh...@gmail.com>.
Hi ,
延迟维表关联这个特性我觉得还是一个比较通用的特性,目前我们考虑借助 Timer 来实现的,社区如果有这个功能的话,我觉得对于 Flink
使用方会有很大帮助的。
我看社区有这样的一个 JIRA 再跟踪了[1],我会持续关注。

[1] https://issues.apache.org/jira/browse/FLINK-19063

Best,
LakeShen

小屁孩 <93...@qq.com> 于2020年6月8日周一 上午9:28写道:

> hi,目前我就是这样做的 数据在启动时会有数据先后到来的问题
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Px New"<15701181132mr.liu@gmail.com&gt;;
> 发送时间:&nbsp;2020年6月7日(星期天) 晚上7:02
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;Re: 关于flinksql 与维表mysql的关联问题
>
>
>
> 好的 我可以理解为是: 通过env.addsouce创建一个广播流。下游connect后 在process方法中操作?
>
> 1048262223 <1048262223@qq.com&gt;于2020年6月7日 周日下午3:57写道:
>
> &gt; Hi
> &gt;
> &gt;
> &gt; 可以使用open + broadcast的方式解决~
> &gt;
> &gt;
> &gt; Best,
> &gt; Yichao Yang
> &gt;
> &gt;
> &gt;
> &gt;
> &gt;
> &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> &gt; 发件人:&amp;nbsp;"Px New"<15701181132mr.liu@gmail.com&amp;gt;;
> &gt; 发送时间:&amp;nbsp;2020年6月6日(星期六) 上午9:50
> &gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
> &gt;
> &gt; 主题:&amp;nbsp;Re: 关于flinksql 与维表mysql的关联问题
> &gt;
> &gt;
> &gt;
> &gt; Hi ,我有一个相关操作的一疑问.
> &gt; 疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?
> &gt;
> &gt; Michael Ran <greemqqran@163.com&amp;gt; 于2020年6月4日周四 下午5:22写道:
> &gt;
> &gt; &amp;gt; 放到open 方法里面可以吗?
> &gt; &amp;gt; 在 2020-06-04 14:15:05,"小屁孩" <932460849@qq.com&amp;gt; 写道:
> &gt; &amp;gt; &amp;gt;dear:&amp;amp;nbsp; &amp;amp;nbsp;
> 我有个问题想请教下,关于flinksql与mysql维表关联
> &gt; 关于mysql更新的问题
> &gt; &amp;gt;
> &gt;
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
> &gt; &amp;gt;

回复: 关于flinksql 与维表mysql的关联问题

Posted by 小屁孩 <93...@qq.com>.
hi,目前我就是这样做的 数据在启动时会有数据先后到来的问题




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Px New"<15701181132mr.liu@gmail.com&gt;;
发送时间:&nbsp;2020年6月7日(星期天) 晚上7:02
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: 关于flinksql 与维表mysql的关联问题



好的 我可以理解为是: 通过env.addsouce创建一个广播流。下游connect后 在process方法中操作?

1048262223 <1048262223@qq.com&gt;于2020年6月7日 周日下午3:57写道:

&gt; Hi
&gt;
&gt;
&gt; 可以使用open + broadcast的方式解决~
&gt;
&gt;
&gt; Best,
&gt; Yichao Yang
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&amp;nbsp;"Px New"<15701181132mr.liu@gmail.com&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年6月6日(星期六) 上午9:50
&gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;Re: 关于flinksql 与维表mysql的关联问题
&gt;
&gt;
&gt;
&gt; Hi ,我有一个相关操作的一疑问.
&gt; 疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?
&gt;
&gt; Michael Ran <greemqqran@163.com&amp;gt; 于2020年6月4日周四 下午5:22写道:
&gt;
&gt; &amp;gt; 放到open 方法里面可以吗?
&gt; &amp;gt; 在 2020-06-04 14:15:05,"小屁孩" <932460849@qq.com&amp;gt; 写道:
&gt; &amp;gt; &amp;gt;dear:&amp;amp;nbsp; &amp;amp;nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联
&gt; 关于mysql更新的问题
&gt; &amp;gt;
&gt; 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
&gt; &amp;gt;

Re: 关于flinksql 与维表mysql的关联问题

Posted by Px New <15...@gmail.com>.
好的 我可以理解为是: 通过env.addsouce创建一个广播流。下游connect后 在process方法中操作?

1048262223 <10...@qq.com>于2020年6月7日 周日下午3:57写道:

> Hi
>
>
> 可以使用open + broadcast的方式解决~
>
>
> Best,
> Yichao Yang
>
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Px New"<15701181132mr.liu@gmail.com&gt;;
> 发送时间:&nbsp;2020年6月6日(星期六) 上午9:50
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;Re: 关于flinksql 与维表mysql的关联问题
>
>
>
> Hi ,我有一个相关操作的一疑问.
> 疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?
>
> Michael Ran <greemqqran@163.com&gt; 于2020年6月4日周四 下午5:22写道:
>
> &gt; 放到open 方法里面可以吗?
> &gt; 在 2020-06-04 14:15:05,"小屁孩" <932460849@qq.com&gt; 写道:
> &gt; &gt;dear:&amp;nbsp; &amp;nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联
> 关于mysql更新的问题
> &gt;
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
> &gt;

回复: 关于flinksql 与维表mysql的关联问题

Posted by 1048262223 <10...@qq.com>.
Hi


可以使用open + broadcast的方式解决~


Best,
Yichao Yang





------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Px New"<15701181132mr.liu@gmail.com&gt;;
发送时间:&nbsp;2020年6月6日(星期六) 上午9:50
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: 关于flinksql 与维表mysql的关联问题



Hi ,我有一个相关操作的一疑问.
疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?

Michael Ran <greemqqran@163.com&gt; 于2020年6月4日周四 下午5:22写道:

&gt; 放到open 方法里面可以吗?
&gt; 在 2020-06-04 14:15:05,"小屁孩" <932460849@qq.com&gt; 写道:
&gt; &gt;dear:&amp;nbsp; &amp;nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题
&gt; 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
&gt;

Re: 关于flinksql 与维表mysql的关联问题

Posted by Px New <15...@gmail.com>.
Hi ,我有一个相关操作的一疑问.
疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?

Michael Ran <gr...@163.com> 于2020年6月4日周四 下午5:22写道:

> 放到open 方法里面可以吗?
> 在 2020-06-04 14:15:05,"小屁孩" <93...@qq.com> 写道:
> >dear:&nbsp; &nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
>

Re: 关于flinksql 与维表mysql的关联问题

Posted by godfrey he <go...@gmail.com>.
hi 可以考虑使用 temporal table join [1]

Best,
Godfrey

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html#join-with-a-temporal-table

小屁孩 <93...@qq.com> 于2020年6月4日周四 下午5:51写道:

> 您的意思是open 全量预加载吗?我目前的逻辑是自己自定义的source 广播出去
> 这是我的source
>
>
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
>
>
> import java.sql.Connection;
> import java.sql.PreparedStatement;
> import java.sql.ResultSet;
> import java.util.HashMap;
> import java.util.Map;
>
>
> public class GetMysqlDvcId extends RichSourceFunction<Map<String,
> Integer&gt;&gt; {
>
>
> &nbsp; &nbsp; private Connection connection = null;
> &nbsp; &nbsp; private PreparedStatement ps = null;
> &nbsp; &nbsp; private volatile boolean isRunning = true;
>
>
>
>
> &nbsp; &nbsp; @Override
> &nbsp; &nbsp; public void open(Configuration parameters) throws Exception {
> &nbsp; &nbsp; &nbsp; &nbsp; super.open(parameters);
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String database="db_nssa";
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String host="212.21.12.12";
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String password="saa!";
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String port="3306";
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String username="root";
>
>
>
>
> &nbsp; &nbsp; &nbsp; &nbsp; String driver = "com.mysql.jdbc.Driver";
> &nbsp; &nbsp; &nbsp; &nbsp; String url = "jdbc:mysql://" + host + ":" +
> port + "/" + database + "?useUnicode=true&amp;characterEncoding=UTF-8";
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; connection =
> MySQLUtil.getConnection(driver, url, username, password);
>
>
>
>
> &nbsp; &nbsp; &nbsp; &nbsp; if (this.connection != null) {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String sql = "select
> ip,device_id from sys_device";
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ps
> =connection.prepareStatement(sql);
> &nbsp; &nbsp; &nbsp; &nbsp; }
> &nbsp; &nbsp; }
>
>
> &nbsp; &nbsp; @Override
> &nbsp; &nbsp; public void run(SourceContext<Map<String, Integer&gt;&gt;
> ctx) throws Exception {
> &nbsp; &nbsp; &nbsp; &nbsp; Map<String, Integer&gt; map = new
> HashMap<&gt;();
> &nbsp; &nbsp; &nbsp; &nbsp; while (isRunning) {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ResultSet resultSet =
> ps.executeQuery();
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while (resultSet.next()) {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> map.put(resultSet.getString("ip"),resultSet.getInt("device_id"));
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> System.out.println("=======select alarm notify from mysql, size = {}, map =
> {}"+ map.size()+ map);
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ctx.collect(map);
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; map.clear();
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Thread.sleep(2000 * 60);
> &nbsp; &nbsp; &nbsp; &nbsp; }
>
>
> &nbsp; &nbsp; }
>
>
>
>
> &nbsp; &nbsp; @Override
> &nbsp; &nbsp; public void cancel() {
> &nbsp; &nbsp; &nbsp; &nbsp; try {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; super.close();
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (connection != null) {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; connection.close();
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (ps != null) {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ps.close();
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
> &nbsp; &nbsp; &nbsp; &nbsp; } catch (Exception e) {
> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
> System.out.println("runException:{}"+e);
> &nbsp; &nbsp; &nbsp; &nbsp; }
> &nbsp; &nbsp; &nbsp; &nbsp; isRunning = false;
> &nbsp; &nbsp; }
> }
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Michael Ran"<greemqqran@163.com&gt;;
> 发送时间:&nbsp;2020年6月4日(星期四) 下午5:22
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;Re:关于flinksql 与维表mysql的关联问题
>
>
>
> 放到open 方法里面可以吗?
> 在 2020-06-04 14:15:05,"小屁孩" <932460849@qq.com&gt; 写道:
> &gt;dear:&amp;nbsp; &amp;nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据

回复:关于flinksql 与维表mysql的关联问题

Posted by 小屁孩 <93...@qq.com>.
您的意思是open 全量预加载吗?我目前的逻辑是自己自定义的source 广播出去
这是我的source


import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;


import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;


public class GetMysqlDvcId extends RichSourceFunction<Map<String, Integer&gt;&gt; {


&nbsp; &nbsp; private Connection connection = null;
&nbsp; &nbsp; private PreparedStatement ps = null;
&nbsp; &nbsp; private volatile boolean isRunning = true;




&nbsp; &nbsp; @Override
&nbsp; &nbsp; public void open(Configuration parameters) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; super.open(parameters);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String database="db_nssa";
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String host="212.21.12.12";
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String password="saa!";
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String port="3306";
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String username="root";




&nbsp; &nbsp; &nbsp; &nbsp; String driver = "com.mysql.jdbc.Driver";
&nbsp; &nbsp; &nbsp; &nbsp; String url = "jdbc:mysql://" + host + ":" + port + "/" + database + "?useUnicode=true&amp;characterEncoding=UTF-8";
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; connection = MySQLUtil.getConnection(driver, url, username, password);




&nbsp; &nbsp; &nbsp; &nbsp; if (this.connection != null) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String sql = "select ip,device_id from sys_device";
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ps =connection.prepareStatement(sql);
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; }


&nbsp; &nbsp; @Override
&nbsp; &nbsp; public void run(SourceContext<Map<String, Integer&gt;&gt; ctx) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; Map<String, Integer&gt; map = new HashMap<&gt;();
&nbsp; &nbsp; &nbsp; &nbsp; while (isRunning) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ResultSet resultSet = ps.executeQuery();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; while (resultSet.next()) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; map.put(resultSet.getString("ip"),resultSet.getInt("device_id"));
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("=======select alarm notify from mysql, size = {}, map = {}"+ map.size()+ map);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ctx.collect(map);
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; map.clear();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Thread.sleep(2000 * 60);
&nbsp; &nbsp; &nbsp; &nbsp; }


&nbsp; &nbsp; }




&nbsp; &nbsp; @Override
&nbsp; &nbsp; public void cancel() {
&nbsp; &nbsp; &nbsp; &nbsp; try {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; super.close();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (connection != null) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; connection.close();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (ps != null) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ps.close();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; } catch (Exception e) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; System.out.println("runException:{}"+e);
&nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; isRunning = false;
&nbsp; &nbsp; }
}

------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Michael Ran"<greemqqran@163.com&gt;;
发送时间:&nbsp;2020年6月4日(星期四) 下午5:22
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re:关于flinksql 与维表mysql的关联问题



放到open 方法里面可以吗?
在 2020-06-04 14:15:05,"小屁孩" <932460849@qq.com&gt; 写道:
&gt;dear:&amp;nbsp; &amp;nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据

Re:关于flinksql 与维表mysql的关联问题

Posted by Michael Ran <gr...@163.com>.
放到open 方法里面可以吗?
在 2020-06-04 14:15:05,"小屁孩" <93...@qq.com> 写道:
>dear:&nbsp; &nbsp; 我有个问题想请教下,关于flinksql与mysql维表关联 关于mysql更新的问题 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据