You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 何 家健 <hz...@outlook.com> on 2023/06/03 19:04:59 UTC

[讨论] 使用sql-client进行流读时出现接收不到数据变更的情况

一、背景
1、在standalone部署模式下,分别有1个jobmanager和1个taskmanager
2、网络环境要求按端口实际使用开放进出(非NAT)
3、依据flink最新文档要求,有关address和port的设置都已作固定设置
      如:jobmanager(cdc-master 192.168.10.11):18081(web ui端口);16123(rpc端口);50101(metrics端口);51101(blob端口)
          taskmanager(cdc-worker 192.168.10.12):19123(data端口);17123(rpc端口);50101(metrics端口)
4、flink使用场景为CDC,CDC操作由apache paimon负责写入到hdfs/hive metastore中;后通过sql-client在streaming模式下进行select流读测试
具体流读操作文档:https://paimon.apache.org/docs/master/how-to/querying-tables/
*flink版本为1.17.0,flink-cdc-mysql为2.3.0,flink-paimon为0.5

二、问题记录
1、在开启了sql-client的select显示模式下,通过修改paimon cdc任务监听的对应数据库表的数据后,sql-client中并没有出现任何增量数据显示(不管过了多久)

三、问题调查
1、apache paimon任务正常运行,hdfs中相关文件均已生成;jobmanager的web ui中的paimon cdc job也没有Exception出现
2、修改sql-client为batch模式后,界面中也能查询到数据变更后的最新数据
3、开启flink日志的DEBUG级别,重新运行任务后发现以下情况:
      3.1、在jobmanager的log文件夹中的flink-xxx-standalonesession-0-xxx.log中存在以下报错信息:

2023-06-02 12:42:37,577 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: Collect table sink (1/1) (ac56691af108802708be4598061d14b3_9dd63673dd41ea021b896d5203f3ba7c_0_0) switched from DEPLOYING to INITIALIZING.
2023-06-02 12:42:37,589 INFO  org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator [] - Received sink socket server address: cdc-worker/192.168.10.12:37824
2023-06-02 12:42:37,590 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: Collect table sink (1/1) (ac56691af108802708be4598061d14b3_9dd63673dd41ea021b896d5203f3ba7c_0_0) switched from INITIALIZING to RUNNING.
2023-06-02 12:42:37,615 DEBUG org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator [] - Collect sink coordinator encounters an exception
java.net.NoRouteToHostException: 没有到主机的路由 (Host unreachable)
      at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:?]
      at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412) ~[?:?]
      at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255) ~[?:?]
      at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237) ~[?:?]
      at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:?]
      at java.net.Socket.connect(Socket.java:608) ~[?:?]
      at java.net.Socket.connect(Socket.java:557) ~[?:?]
      at org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator.handleRequestImpl(CollectSinkOperatorCoordinator.java:132) ~[flink-dist-1.17.0.jar:1.17.0]
      at org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorCoordinator.lambda$handleCoordinationRequest$0(CollectSinkOperatorCoordinator.java:111) ~[flink-dist-1.17.0.jar:1.17.0]
      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
      at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
      at java.lang.Thread.run(Thread.java:834) [?:?]
2023-06-02 12:42:37,701 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: Collect table sink (1/1) (ac56691af108802708be4598061d14b3_9dd63673dd41ea021b896d5203f3ba7c_0_0) switched from RUNNING to FINISHED.
2023-06-02 12:42:37,701 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Ignoring transition of vertex Sink: Collect table sink (1/1) - execution #0 to FAILED while being FINISHED.

      3.2、通过报错信息怀疑存在网络问题,因为目前存在严格ACL控制,所以大概率是存在未知端口的情况
4、看日志问题集中在CollectSinkOperatorCoordinator上,且该日志是jobmanager的,怀疑是存在jobmanager请求taskmanager的情况,但不清楚为何存在这种情况
5、查看CollectSinkOperatorCoordinator#handleRequestImpl源码后发现存在socket连接的情况,重点就在于该sinkAddress是如何被赋值的;查看其调用情况后发现该值是被同类中handleEventFromOperator方法赋值:
      address = ((CollectSinkAddressEvent) event).getAddress();
      LOG.info("Received sink socket server address: " + address);
   接着通过arthas查看其运行时的堆栈情况(watch/stack),发现该address在每次新建collect(流读)任务时均会有所变化,且方法调用链还与akka相关;因对该任务的新建流程尚不熟悉,所以调查了该调用链的所有相关类信息,最终在其中发现CollectSinkFunction类中存在ServerSocket(其实应该更早明白到跟socket是共同存在的,直接在flink中全局搜索相关信息能更快定位),同时使用关键字“ServerSocket”查询flink中所有相关项,发现只有该类存在使用ServerSocket,并进一步发现其存在原因 https://github.com/apache/flink/pull/12069

四、问题处理
1、目前通过修改ServerSocket初始化时的端口,重新编译后(flink master中的依然为0即系统随机一个端口),解决了严格ACL下sql-client流读的问题

五、建议
1、flink中还有类似情况存在吗?即需要网络通讯但又没有在文档中说明的
2、明白到通常集群都是可信的,但某些情况下并不都能完全放开,且该功能又偏调试性质(但个人认为还算比较重要),能否像其他有关network address/port一样通过配置文件指定?若能优化就更好了,如果可以的话请assign me(新手~)