You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Yichao Yang <10...@qq.com> on 2020/06/29 10:57:57 UTC

回复:flinksql流计算任务非正常结束

Hi


看你的日志你的数据源是hive table?可以看下是否是批作业模式而不是流作业模式。


Best,
Yichao Yang




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"MuChen"<9329748@qq.com&gt;;
发送时间:&nbsp;2020年6月29日(星期一) 下午4:53
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;flinksql流计算任务非正常结束



hi,大家好:

我启动了yarn-session:bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm fsql-cli&amp;nbsp; 2&amp;gt;&amp;amp;1 &amp;amp;

然后通过sql-client,提交了一个sql:

主要逻辑是将一个kafka表和一个hive维表做join,然后将聚合结果写到mysql中。&amp;nbsp;

运行过程中,经常出现短则几个小时,长则几十个小时后,任务状态变为succeeded的情况,如图:https://s1.ax1x.com/2020/06/29/Nf2dIA.png

日志中能看到INFO级别的异常,15:34任务结束时的日志如下:
2020-06-29 14:53:20,260 INFO&nbsp; org.apache.flink.api.common.io.LocatableInputSplitAssigner&nbsp;&nbsp;&nbsp; - Assigning remote split to host uhadoop-op3raf-core12 2020-06-29 14:53:22,845 INFO&nbsp; org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; - Source: HiveTableSource(vid, q70) TablePath: dw.video_pic_title_q70, PartitionPruned: false, PartitionNums: null (1/1) (68c24aa5 9c898cefbb20fbc929ddbafd) switched from RUNNING to FINISHED. 2020-06-29 15:34:52,982 INFO&nbsp; org.apache.flink.runtime.entrypoint.ClusterEntrypoint&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; - Shutting YarnSessionClusterEntrypoint down with application status SUCCEEDED. Diagnostics null. 2020-06-29 15:34:52,984 INFO&nbsp; org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&nbsp;&nbsp;&nbsp; - Shutting down rest endpoint. 2020-06-29 15:34:53,072 INFO&nbsp; org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&nbsp;&nbsp;&nbsp; - Removing cache directory /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui 2020-06-29 15:34:53,073 INFO&nbsp; org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&nbsp;&nbsp;&nbsp; - http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29 15:34:53,074 INFO&nbsp; org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&nbsp;&nbsp;&nbsp; - Shut down complete. 2020-06-29 15:34:53,074 INFO&nbsp; org.apache.flink.yarn.YarnResourceManager&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; - Shut down cluster because application is in SUCCEEDED, diagnostics null. 2020-06-29 15:34:53,076 INFO&nbsp; org.apache.flink.yarn.YarnResourceManager&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; - Unregister application from the YARN Resource Manager with final status SUCCEEDED. 2020-06-29 15:34:53,088 INFO&nbsp; org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; - Waiting for application to be successfully unregistered. 2020-06-29 15:34:53,306 INFO&nbsp; org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent&nbsp; - Closing components. 2020-06-29 15:34:53,308 INFO&nbsp; org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess&nbsp; - Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309 INFO&nbsp; org.apache.flink.runtime.dispatcher.StandaloneDispatcher&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; - Stopping dispatcher akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29 15:34:53,310 INFO&nbsp; org.apache.flink.runtime.dispatcher.StandaloneDispatcher&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; - Stopping all currently running jobs of dispatcher akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29 15:34:53,311 INFO&nbsp; org.apache.flink.runtime.jobmaster.JobMaster&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; - Stopping the JobMaster for job default: insert into rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322 INFO&nbsp; org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl&nbsp; - Interrupted while waiting for queue java.lang.InterruptedException&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287) 2020-06-29 15:34:53,324 INFO&nbsp; org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy&nbsp; - Opening proxy : uhadoop-op3raf-core12:23333 

&nbsp;
ps:&amp;nbsp;

1. kafka中一直有数据在写入的
2. flink版本1.10.0
请问,任务状态为什么会变为SUCCEEDED呢?

谢谢大家!




逻辑稍微有些复杂,可以忽略下面的sql代码:
#&nbsp; -- 提供:5分钟起始时间、vid、vid_group、曝光次数、点击次数、播放次数、有效播放次数 -- 每5分钟将近5分钟统计结果写入mysql insert into rt_app.app_video_cover_abtest_test&nbsp; select&nbsp; begin_time,&nbsp; vid,&nbsp; vid_group,&nbsp; max(dv),&nbsp; max(click),&nbsp; max(vv),&nbsp; max(effectivevv) from(&nbsp; select&nbsp;&nbsp; t1.begin_time begin_time,&nbsp;&nbsp; t1.u_vid vid,&nbsp;&nbsp; t1.u_vid_group vid_group,&nbsp;&nbsp; dv,&nbsp;&nbsp; click,&nbsp;&nbsp; vv,&nbsp;&nbsp; if(effectivevv is null,0,effectivevv) effectivevv&nbsp; from&nbsp; (&nbsp;&nbsp; -- dv、click、vv&nbsp;&nbsp; select&nbsp;&nbsp;&nbsp;&nbsp; CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS STRING) begin_time,&nbsp;&nbsp;&nbsp; cast(u_vid as bigint) u_vid,&nbsp;&nbsp;&nbsp; u_vid_group,&nbsp;&nbsp;&nbsp; sum(if(concat(u_mod,'-',u_ac)='emptylog-video_display' and u_c_module='M011',1,0)) dv,&nbsp;&nbsp;&nbsp; sum(if(concat(u_mod,'-',u_ac)='emptylog-video_click' and u_c_module='M011',1,0)) click,&nbsp;&nbsp;&nbsp; sum(if(concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011',1,0)) vv&nbsp;&nbsp; FROM rt_ods.ods_applog_vidsplit&nbsp;&nbsp; where u_vid is not null and trim(u_vid)<&amp;gt;''&nbsp;&nbsp;&nbsp; and u_vid_group is not null and trim(u_vid_group) not in ('','-1')&nbsp;&nbsp;&nbsp; and (&nbsp; (concat(u_mod,'-',u_ac) in ('emptylog-video_display','emptylog-video_click')&nbsp; and u_c_module='M011')&nbsp; or&nbsp; (concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011')&nbsp;&nbsp;&nbsp;&nbsp; )&nbsp;&nbsp; group by&nbsp;&nbsp;&nbsp;&nbsp; TUMBLE(bjdt, INTERVAL '5' MINUTE),&nbsp;&nbsp;&nbsp; cast(u_vid as bigint),&nbsp;&nbsp;&nbsp; u_vid_group&nbsp; ) t1&nbsp; left join&nbsp; (&nbsp;&nbsp; -- effectivevv&nbsp;&nbsp; select&nbsp;&nbsp;&nbsp; begin_time,&nbsp;&nbsp;&nbsp; u_vid,&nbsp;&nbsp;&nbsp; u_vid_group,&nbsp;&nbsp;&nbsp; count(1) effectivevv&nbsp;&nbsp; from&nbsp;&nbsp; (&nbsp;&nbsp;&nbsp; select&nbsp; begin_time,&nbsp; u_vid,&nbsp; u_vid_group,&nbsp; u_diu,&nbsp; u_playid,&nbsp; m_pt,&nbsp; q70&nbsp;&nbsp;&nbsp; from&nbsp;&nbsp;&nbsp; dw.video_pic_title_q70 a&nbsp;&nbsp;&nbsp; join&nbsp;&nbsp;&nbsp; (&nbsp;&nbsp;&nbsp;&nbsp; select&nbsp;&nbsp; CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS STRING) begin_time,&nbsp; cast(u_vid as bigint) u_vid,&nbsp; u_vid_group,&nbsp; u_diu,&nbsp; u_playid,&nbsp; max(u_playtime) m_pt&nbsp;&nbsp;&nbsp;&nbsp; FROM rt_ods.ods_applog_vidsplit&nbsp;&nbsp;&nbsp;&nbsp; where u_vid is not null and trim(u_vid)<&amp;gt;''&nbsp; and u_vid_group is not null and trim(u_vid_group) not in ('','-1')&nbsp; and concat(u_mod,'-',u_ac)='emptylog-video_play_speed'&nbsp; and u_f_module='M011'&nbsp; and u_playtime&amp;gt;0&nbsp;&nbsp;&nbsp;&nbsp; group by&nbsp;&nbsp; TUMBLE(bjdt, INTERVAL '5' MINUTE),&nbsp; cast(u_vid as bigint),&nbsp; u_vid_group,&nbsp; u_diu,&nbsp; u_playid&nbsp;&nbsp;&nbsp; ) b&nbsp;&nbsp;&nbsp; on a.vid=b.u_vid&nbsp;&nbsp;&nbsp; group by&nbsp;&nbsp; begin_time,&nbsp; u_vid,&nbsp; u_vid_group,&nbsp; u_diu,&nbsp; u_playid,&nbsp; m_pt,&nbsp; q70&nbsp;&nbsp; ) temp&nbsp;&nbsp; where m_pt&amp;gt;=q70&nbsp;&nbsp; group by&nbsp;&nbsp;&nbsp;&nbsp; begin_time,&nbsp;&nbsp;&nbsp; u_vid,&nbsp;&nbsp;&nbsp; u_vid_group&nbsp; ) t2&nbsp; on t1.begin_time=t2.begin_time&nbsp;&nbsp; and t1.u_vid=t2.u_vid&nbsp;&nbsp; and t1.u_vid_group=t2.u_vid_group )t3&nbsp;&nbsp; group by begin_time,&nbsp; vid,&nbsp; vid_group ;

回复: flinksql流计算任务非正常结束

Posted by MuChen <93...@qq.com>.
是的,作业状态是成功结束。


任务中是把hive table作为维表使用了,从任务dag看,只是在任务启动的开始查询了hive表,读hive表只花了3秒钟,然后这个subtask就是结束状态了,如图:
https://s1.ax1x.com/2020/06/30/N4qxNq.png


但是,其他的subtask还是一直处于执行running状态,图上任务已经执行了19个小时了,但是可能随时会以SUCCESS状态停止。




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"Rui Li"<lirui.fudan@gmail.com&gt;;
发送时间:&nbsp;2020年6月30日(星期二) 中午11:01
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: flinksql流计算任务非正常结束



作业最后的状态是成功结束么?Hive table source是一个bounded
stream,所以hive表的数据读完这个stream就结束了,不知道这个对作业是不是有影响。

On Tue, Jun 30, 2020 at 10:39 AM MuChen <9329748@qq.com&gt; wrote:

&gt; 看了配置文件,是流作业啊
&gt;
&gt;
&gt; $ grep -v \# sql-client-defaults.yaml |grep -v ^$ catalogs:&nbsp;&nbsp;&nbsp; - name:
&gt; myhive&nbsp;&nbsp;&nbsp;&nbsp; type: hive&nbsp;&nbsp;&nbsp;&nbsp; hive-conf-dir: /home/fsql/hive/conf
&gt;&nbsp; default-database: default execution:&nbsp;&nbsp; planner: blink&nbsp;&nbsp; type: streaming
&gt;&nbsp; time-characteristic: event-time&nbsp;&nbsp; periodic-watermarks-interval: 200
&gt;&nbsp; result-mode: table&nbsp;&nbsp; max-table-result-rows: 1000000&nbsp;&nbsp; parallelism: 4
&gt;&nbsp; max-parallelism: 128&nbsp;&nbsp; min-idle-state-retention: 0
&gt;&nbsp; max-idle-state-retention: 0&nbsp;&nbsp; current-catalog: myhive&nbsp;&nbsp; current-database:
&gt; default&nbsp;&nbsp; restart-strategy:&nbsp;&nbsp;&nbsp;&nbsp; type: fixed-delay deployment:
&gt;&nbsp; response-timeout: 5000&nbsp;&nbsp; gateway-address: ""&nbsp;&nbsp; gateway-port: 0
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&amp;nbsp;"zhisheng"<zhisheng2018@gmail.com&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年6月30日(星期二) 上午9:05
&gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;Re: flinksql流计算任务非正常结束
&gt;
&gt;
&gt;
&gt; 是不是作业是一个批作业呀?
&gt;
&gt; Yichao Yang <1048262223@qq.com&amp;gt; 于2020年6月29日周一 下午6:58写道:
&gt;
&gt; &amp;gt; Hi
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; 看你的日志你的数据源是hive table?可以看下是否是批作业模式而不是流作业模式。
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; Best,
&gt; &amp;gt; Yichao Yang
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; ------------------&amp;amp;nbsp;原始邮件&amp;amp;nbsp;------------------
&gt; &amp;gt; 发件人:&amp;amp;nbsp;"MuChen"<9329748@qq.com&amp;amp;gt;;
&gt; &amp;gt; 发送时间:&amp;amp;nbsp;2020年6月29日(星期一) 下午4:53
&gt; &amp;gt; 收件人:&amp;amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;amp;gt;;
&gt; &amp;gt;
&gt; &amp;gt; 主题:&amp;amp;nbsp;flinksql流计算任务非正常结束
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; hi,大家好:
&gt; &amp;gt;
&gt; &amp;gt; 我启动了yarn-session:bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu
&gt; root.flink -nm
&gt; &amp;gt; fsql-cli&amp;amp;amp;nbsp; 2&amp;amp;amp;gt;&amp;amp;amp;amp;1 &amp;amp;amp;amp;
&gt; &amp;gt;
&gt; &amp;gt; 然后通过sql-client,提交了一个sql:
&gt; &amp;gt;
&gt; &amp;gt; 主要逻辑是将一个kafka表和一个hive维表做join,然后将聚合结果写到mysql中。&amp;amp;amp;nbsp;
&gt; &amp;gt;
&gt; &amp;gt; 运行过程中,经常出现短则几个小时,长则几十个小时后,任务状态变为succeeded的情况,如图:
&gt; &amp;gt; https://s1.ax1x.com/2020/06/29/Nf2dIA.png
&gt; &amp;gt;
&gt; &amp;gt; 日志中能看到INFO级别的异常,15:34任务结束时的日志如下:
&gt; &amp;gt; 2020-06-29 14:53:20,260 INFO&amp;amp;nbsp;
&gt; &amp;gt;
&gt; org.apache.flink.api.common.io.LocatableInputSplitAssigner&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; - Assigning remote split to host uhadoop-op3raf-core12 2020-06-29
&gt; &amp;gt; 14:53:22,845 INFO&amp;amp;nbsp;
&gt; &amp;gt;
&gt; org.apache.flink.runtime.executiongraph.ExecutionGraph&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; - Source: HiveTableSource(vid, q70) TablePath: dw.video_pic_title_q70,
&gt; &amp;gt; PartitionPruned: false, PartitionNums: null (1/1) (68c24aa5
&gt; &amp;gt; 9c898cefbb20fbc929ddbafd) switched from RUNNING to FINISHED.
&gt; 2020-06-29
&gt; &amp;gt; 15:34:52,982 INFO&amp;amp;nbsp;
&gt; &amp;gt;
&gt; org.apache.flink.runtime.entrypoint.ClusterEntrypoint&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; - Shutting YarnSessionClusterEntrypoint down with application status
&gt; &amp;gt; SUCCEEDED. Diagnostics null. 2020-06-29 15:34:52,984 INFO&amp;amp;nbsp;
&gt; &amp;gt;
&gt; org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; - Shutting down rest endpoint. 2020-06-29 15:34:53,072 INFO&amp;amp;nbsp;
&gt; &amp;gt;
&gt; org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; - Removing cache directory
&gt; &amp;gt; /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui
&gt; 2020-06-29
&gt; &amp;gt; 15:34:53,073 INFO&amp;amp;nbsp;
&gt; &amp;gt;
&gt; org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; - http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29
&gt; &amp;gt; 15:34:53,074 INFO&amp;amp;nbsp;
&gt; &amp;gt;
&gt; org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; - Shut down complete. 2020-06-29 15:34:53,074 INFO&amp;amp;nbsp;
&gt; &amp;gt;
&gt; org.apache.flink.yarn.YarnResourceManager&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; - Shut down cluster because application is in SUCCEEDED, diagnostics
&gt; null.
&gt; &amp;gt; 2020-06-29 15:34:53,076 INFO&amp;amp;nbsp;
&gt; &amp;gt;
&gt; org.apache.flink.yarn.YarnResourceManager&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; - Unregister application from the YARN Resource Manager with final
&gt; status
&gt; &amp;gt; SUCCEEDED. 2020-06-29 15:34:53,088 INFO&amp;amp;nbsp;
&gt; &amp;gt;
&gt; org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; - Waiting for application to be successfully unregistered. 2020-06-29
&gt; &amp;gt; 15:34:53,306 INFO&amp;amp;nbsp;
&gt; &amp;gt;
&gt; org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent&amp;amp;nbsp;
&gt; &amp;gt; - Closing components. 2020-06-29 15:34:53,308 INFO&amp;amp;nbsp;
&gt; &amp;gt;
&gt; org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess&amp;amp;nbsp;
&gt; &amp;gt; - Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309
&gt; &amp;gt; INFO&amp;amp;nbsp;
&gt; &amp;gt;
&gt; org.apache.flink.runtime.dispatcher.StandaloneDispatcher&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; - Stopping dispatcher akka.tcp://flink@uhadoop-op3raf-core1
&gt; :38817/user/dispatcher.
&gt; &amp;gt; 2020-06-29 15:34:53,310 INFO&amp;amp;nbsp;
&gt; &amp;gt;
&gt; org.apache.flink.runtime.dispatcher.StandaloneDispatcher&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; - Stopping all currently running jobs of dispatcher
&gt; &amp;gt; akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher.
&gt; 2020-06-29
&gt; &amp;gt; 15:34:53,311 INFO&amp;amp;nbsp;
&gt; &amp;gt;
&gt; org.apache.flink.runtime.jobmaster.JobMaster&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; - Stopping the JobMaster for job default: insert into
&gt; &amp;gt; rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322
&gt; INFO&amp;amp;nbsp;
&gt; &amp;gt;
&gt; org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl&amp;amp;nbsp; -
&gt; &amp;gt; Interrupted while waiting for queue
&gt; &amp;gt;
&gt; java.lang.InterruptedException&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; at
&gt; &amp;gt;
&gt; java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; at
&gt; &amp;gt;
&gt; java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; at
&gt; &amp;gt;
&gt; java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; at
&gt; &amp;gt;
&gt; org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287)
&gt; &amp;gt; 2020-06-29 15:34:53,324 INFO&amp;amp;nbsp;
&gt; &amp;gt;
&gt; org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy&amp;amp;nbsp;
&gt; &amp;gt; - Opening proxy : uhadoop-op3raf-core12:23333
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;nbsp;
&gt; &amp;gt; ps:&amp;amp;amp;nbsp;
&gt; &amp;gt;
&gt; &amp;gt; 1. kafka中一直有数据在写入的
&gt; &amp;gt; 2. flink版本1.10.0
&gt; &amp;gt; 请问,任务状态为什么会变为SUCCEEDED呢?
&gt; &amp;gt;
&gt; &amp;gt; 谢谢大家!
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; 逻辑稍微有些复杂,可以忽略下面的sql代码:
&gt; &amp;gt; #&amp;amp;nbsp; -- 提供:5分钟起始时间、vid、vid_group、曝光次数、点击次数、播放次数、有效播放次数 --
&gt; &amp;gt; 每5分钟将近5分钟统计结果写入mysql insert into
&gt; rt_app.app_video_cover_abtest_test&amp;amp;nbsp;
&gt; &amp;gt; select&amp;amp;nbsp; begin_time,&amp;amp;nbsp; vid,&amp;amp;nbsp;
&gt; vid_group,&amp;amp;nbsp; max(dv),&amp;amp;nbsp;
&gt; &amp;gt; max(click),&amp;amp;nbsp; max(vv),&amp;amp;nbsp; max(effectivevv)
&gt; from(&amp;amp;nbsp;
&gt; &amp;gt; select&amp;amp;nbsp;&amp;amp;nbsp; t1.begin_time
&gt; begin_time,&amp;amp;nbsp;&amp;amp;nbsp; t1.u_vid
&gt; &amp;gt; vid,&amp;amp;nbsp;&amp;amp;nbsp; t1.u_vid_group
&gt; vid_group,&amp;amp;nbsp;&amp;amp;nbsp; dv,&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; click,&amp;amp;nbsp;&amp;amp;nbsp; vv,&amp;amp;nbsp;&amp;amp;nbsp; if(effectivevv is
&gt; null,0,effectivevv)
&gt; &amp;gt; effectivevv&amp;amp;nbsp; from&amp;amp;nbsp; (&amp;amp;nbsp;&amp;amp;nbsp; --
&gt; dv、click、vv&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; select&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE)
&gt; &amp;gt; AS STRING) begin_time,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; cast(u_vid as
&gt; bigint)
&gt; &amp;gt; u_vid,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; u_vid_group,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; sum(if(concat(u_mod,'-',u_ac)='emptylog-video_display' and
&gt; &amp;gt; u_c_module='M011',1,0)) dv,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; sum(if(concat(u_mod,'-',u_ac)='emptylog-video_click' and
&gt; &amp;gt; u_c_module='M011',1,0)) click,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; sum(if(concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011',1,0))
&gt; &amp;gt; vv&amp;amp;nbsp;&amp;amp;nbsp; FROM
&gt; rt_ods.ods_applog_vidsplit&amp;amp;nbsp;&amp;amp;nbsp; where u_vid is
&gt; &amp;gt; not null and trim(u_vid)<&amp;amp;amp;gt;''&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; and u_vid_group is
&gt; &amp;gt; not null and trim(u_vid_group) not in
&gt; ('','-1')&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; and
&gt; &amp;gt; (&amp;amp;nbsp; (concat(u_mod,'-',u_ac) in
&gt; &amp;gt; ('emptylog-video_display','emptylog-video_click')&amp;amp;nbsp; and
&gt; &amp;gt; u_c_module='M011')&amp;amp;nbsp; or&amp;amp;nbsp;
&gt; (concat(u_mod,'-',u_ac)='top-hits' and
&gt; &amp;gt; u_f_module='M011')&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; )&amp;amp;nbsp;&amp;amp;nbsp; group
&gt; &amp;gt; by&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; TUMBLE(bjdt, INTERVAL '5'
&gt; &amp;gt; MINUTE),&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; cast(u_vid as
&gt; bigint),&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; u_vid_group&amp;amp;nbsp; ) t1&amp;amp;nbsp; left join&amp;amp;nbsp;
&gt; (&amp;amp;nbsp;&amp;amp;nbsp; --
&gt; &amp;gt; effectivevv&amp;amp;nbsp;&amp;amp;nbsp; select&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; begin_time,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; u_vid,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; u_vid_group,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; count(1)
&gt; effectivevv&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; from&amp;amp;nbsp;&amp;amp;nbsp; (&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; select&amp;amp;nbsp; begin_time,&amp;amp;nbsp;
&gt; &amp;gt; u_vid,&amp;amp;nbsp; u_vid_group,&amp;amp;nbsp; u_diu,&amp;amp;nbsp;
&gt; u_playid,&amp;amp;nbsp; m_pt,&amp;amp;nbsp;
&gt; &amp;gt; q70&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; from&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; dw.video_pic_title_q70
&gt; &amp;gt; a&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; join&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; (&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; select&amp;amp;nbsp;&amp;amp;nbsp; CAST(TUMBLE_START(bjdt,INTERVAL '5'
&gt; MINUTE) AS STRING)
&gt; &amp;gt; begin_time,&amp;amp;nbsp; cast(u_vid as bigint) u_vid,&amp;amp;nbsp;
&gt; u_vid_group,&amp;amp;nbsp;
&gt; &amp;gt; u_diu,&amp;amp;nbsp; u_playid,&amp;amp;nbsp; max(u_playtime)
&gt; m_pt&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; FROM
&gt; rt_ods.ods_applog_vidsplit&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; where
&gt; u_vid is not
&gt; &amp;gt; null and trim(u_vid)<&amp;amp;amp;gt;''&amp;amp;nbsp; and u_vid_group is not
&gt; null and
&gt; &amp;gt; trim(u_vid_group) not in ('','-1')&amp;amp;nbsp; and
&gt; &amp;gt; concat(u_mod,'-',u_ac)='emptylog-video_play_speed'&amp;amp;nbsp; and
&gt; &amp;gt; u_f_module='M011'&amp;amp;nbsp; and
&gt; u_playtime&amp;amp;amp;gt;0&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; group by&amp;amp;nbsp;&amp;amp;nbsp; TUMBLE(bjdt, INTERVAL '5'
&gt; MINUTE),&amp;amp;nbsp; cast(u_vid as
&gt; &amp;gt; bigint),&amp;amp;nbsp; u_vid_group,&amp;amp;nbsp; u_diu,&amp;amp;nbsp;
&gt; u_playid&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; )
&gt; &amp;gt; b&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; on
&gt; a.vid=b.u_vid&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; group by&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; begin_time,&amp;amp;nbsp; u_vid,&amp;amp;nbsp; u_vid_group,&amp;amp;nbsp;
&gt; u_diu,&amp;amp;nbsp;
&gt; &amp;gt; u_playid,&amp;amp;nbsp; m_pt,&amp;amp;nbsp; q70&amp;amp;nbsp;&amp;amp;nbsp; )
&gt; temp&amp;amp;nbsp;&amp;amp;nbsp; where
&gt; &amp;gt; m_pt&amp;amp;amp;gt;=q70&amp;amp;nbsp;&amp;amp;nbsp; group
&gt; by&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; &amp;gt; begin_time,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; u_vid,&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp; u_vid_group&amp;amp;nbsp; )
&gt; &amp;gt; t2&amp;amp;nbsp; on t1.begin_time=t2.begin_time&amp;amp;nbsp;&amp;amp;nbsp; and
&gt; &amp;gt; t1.u_vid=t2.u_vid&amp;amp;nbsp;&amp;amp;nbsp; and
&gt; t1.u_vid_group=t2.u_vid_group
&gt; &amp;gt; )t3&amp;amp;nbsp;&amp;amp;nbsp; group by begin_time,&amp;amp;nbsp; vid,&amp;amp;nbsp;
&gt; vid_group ;



-- 
Best regards!
Rui Li

Re: flinksql流计算任务非正常结束

Posted by Rui Li <li...@gmail.com>.
作业最后的状态是成功结束么?Hive table source是一个bounded
stream,所以hive表的数据读完这个stream就结束了,不知道这个对作业是不是有影响。

On Tue, Jun 30, 2020 at 10:39 AM MuChen <93...@qq.com> wrote:

> 看了配置文件,是流作业啊
>
>
> $ grep -v \# sql-client-defaults.yaml |grep -v ^$ catalogs:    - name:
> myhive     type: hive     hive-conf-dir: /home/fsql/hive/conf
>  default-database: default execution:   planner: blink   type: streaming
>  time-characteristic: event-time   periodic-watermarks-interval: 200
>  result-mode: table   max-table-result-rows: 1000000   parallelism: 4
>  max-parallelism: 128   min-idle-state-retention: 0
>  max-idle-state-retention: 0   current-catalog: myhive   current-database:
> default   restart-strategy:     type: fixed-delay deployment:
>  response-timeout: 5000   gateway-address: ""   gateway-port: 0
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"zhisheng"<zhisheng2018@gmail.com&gt;;
> 发送时间:&nbsp;2020年6月30日(星期二) 上午9:05
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;Re: flinksql流计算任务非正常结束
>
>
>
> 是不是作业是一个批作业呀?
>
> Yichao Yang <1048262223@qq.com&gt; 于2020年6月29日周一 下午6:58写道:
>
> &gt; Hi
> &gt;
> &gt;
> &gt; 看你的日志你的数据源是hive table?可以看下是否是批作业模式而不是流作业模式。
> &gt;
> &gt;
> &gt; Best,
> &gt; Yichao Yang
> &gt;
> &gt;
> &gt;
> &gt;
> &gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
> &gt; 发件人:&amp;nbsp;"MuChen"<9329748@qq.com&amp;gt;;
> &gt; 发送时间:&amp;nbsp;2020年6月29日(星期一) 下午4:53
> &gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
> &gt;
> &gt; 主题:&amp;nbsp;flinksql流计算任务非正常结束
> &gt;
> &gt;
> &gt;
> &gt; hi,大家好:
> &gt;
> &gt; 我启动了yarn-session:bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu
> root.flink -nm
> &gt; fsql-cli&amp;amp;nbsp; 2&amp;amp;gt;&amp;amp;amp;1 &amp;amp;amp;
> &gt;
> &gt; 然后通过sql-client,提交了一个sql:
> &gt;
> &gt; 主要逻辑是将一个kafka表和一个hive维表做join,然后将聚合结果写到mysql中。&amp;amp;nbsp;
> &gt;
> &gt; 运行过程中,经常出现短则几个小时,长则几十个小时后,任务状态变为succeeded的情况,如图:
> &gt; https://s1.ax1x.com/2020/06/29/Nf2dIA.png
> &gt;
> &gt; 日志中能看到INFO级别的异常,15:34任务结束时的日志如下:
> &gt; 2020-06-29 14:53:20,260 INFO&amp;nbsp;
> &gt;
> org.apache.flink.api.common.io.LocatableInputSplitAssigner&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; - Assigning remote split to host uhadoop-op3raf-core12 2020-06-29
> &gt; 14:53:22,845 INFO&amp;nbsp;
> &gt;
> org.apache.flink.runtime.executiongraph.ExecutionGraph&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; - Source: HiveTableSource(vid, q70) TablePath: dw.video_pic_title_q70,
> &gt; PartitionPruned: false, PartitionNums: null (1/1) (68c24aa5
> &gt; 9c898cefbb20fbc929ddbafd) switched from RUNNING to FINISHED.
> 2020-06-29
> &gt; 15:34:52,982 INFO&amp;nbsp;
> &gt;
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; - Shutting YarnSessionClusterEntrypoint down with application status
> &gt; SUCCEEDED. Diagnostics null. 2020-06-29 15:34:52,984 INFO&amp;nbsp;
> &gt;
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; - Shutting down rest endpoint. 2020-06-29 15:34:53,072 INFO&amp;nbsp;
> &gt;
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; - Removing cache directory
> &gt; /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui
> 2020-06-29
> &gt; 15:34:53,073 INFO&amp;nbsp;
> &gt;
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; - http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29
> &gt; 15:34:53,074 INFO&amp;nbsp;
> &gt;
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; - Shut down complete. 2020-06-29 15:34:53,074 INFO&amp;nbsp;
> &gt;
> org.apache.flink.yarn.YarnResourceManager&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; - Shut down cluster because application is in SUCCEEDED, diagnostics
> null.
> &gt; 2020-06-29 15:34:53,076 INFO&amp;nbsp;
> &gt;
> org.apache.flink.yarn.YarnResourceManager&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; - Unregister application from the YARN Resource Manager with final
> status
> &gt; SUCCEEDED. 2020-06-29 15:34:53,088 INFO&amp;nbsp;
> &gt;
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; - Waiting for application to be successfully unregistered. 2020-06-29
> &gt; 15:34:53,306 INFO&amp;nbsp;
> &gt;
> org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent&amp;nbsp;
> &gt; - Closing components. 2020-06-29 15:34:53,308 INFO&amp;nbsp;
> &gt;
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess&amp;nbsp;
> &gt; - Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309
> &gt; INFO&amp;nbsp;
> &gt;
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; - Stopping dispatcher akka.tcp://flink@uhadoop-op3raf-core1
> :38817/user/dispatcher.
> &gt; 2020-06-29 15:34:53,310 INFO&amp;nbsp;
> &gt;
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; - Stopping all currently running jobs of dispatcher
> &gt; akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher.
> 2020-06-29
> &gt; 15:34:53,311 INFO&amp;nbsp;
> &gt;
> org.apache.flink.runtime.jobmaster.JobMaster&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; - Stopping the JobMaster for job default: insert into
> &gt; rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322
> INFO&amp;nbsp;
> &gt;
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl&amp;nbsp; -
> &gt; Interrupted while waiting for queue
> &gt;
> java.lang.InterruptedException&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; at
> &gt;
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; at
> &gt;
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; at
> &gt;
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; at
> &gt;
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287)
> &gt; 2020-06-29 15:34:53,324 INFO&amp;nbsp;
> &gt;
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy&amp;nbsp;
> &gt; - Opening proxy : uhadoop-op3raf-core12:23333
> &gt;
> &gt; &amp;nbsp;
> &gt; ps:&amp;amp;nbsp;
> &gt;
> &gt; 1. kafka中一直有数据在写入的
> &gt; 2. flink版本1.10.0
> &gt; 请问,任务状态为什么会变为SUCCEEDED呢?
> &gt;
> &gt; 谢谢大家!
> &gt;
> &gt;
> &gt;
> &gt;
> &gt; 逻辑稍微有些复杂,可以忽略下面的sql代码:
> &gt; #&amp;nbsp; -- 提供:5分钟起始时间、vid、vid_group、曝光次数、点击次数、播放次数、有效播放次数 --
> &gt; 每5分钟将近5分钟统计结果写入mysql insert into
> rt_app.app_video_cover_abtest_test&amp;nbsp;
> &gt; select&amp;nbsp; begin_time,&amp;nbsp; vid,&amp;nbsp;
> vid_group,&amp;nbsp; max(dv),&amp;nbsp;
> &gt; max(click),&amp;nbsp; max(vv),&amp;nbsp; max(effectivevv)
> from(&amp;nbsp;
> &gt; select&amp;nbsp;&amp;nbsp; t1.begin_time
> begin_time,&amp;nbsp;&amp;nbsp; t1.u_vid
> &gt; vid,&amp;nbsp;&amp;nbsp; t1.u_vid_group
> vid_group,&amp;nbsp;&amp;nbsp; dv,&amp;nbsp;&amp;nbsp;
> &gt; click,&amp;nbsp;&amp;nbsp; vv,&amp;nbsp;&amp;nbsp; if(effectivevv is
> null,0,effectivevv)
> &gt; effectivevv&amp;nbsp; from&amp;nbsp; (&amp;nbsp;&amp;nbsp; --
> dv、click、vv&amp;nbsp;&amp;nbsp;
> &gt; select&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE)
> &gt; AS STRING) begin_time,&amp;nbsp;&amp;nbsp;&amp;nbsp; cast(u_vid as
> bigint)
> &gt; u_vid,&amp;nbsp;&amp;nbsp;&amp;nbsp;
> u_vid_group,&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; sum(if(concat(u_mod,'-',u_ac)='emptylog-video_display' and
> &gt; u_c_module='M011',1,0)) dv,&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; sum(if(concat(u_mod,'-',u_ac)='emptylog-video_click' and
> &gt; u_c_module='M011',1,0)) click,&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; sum(if(concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011',1,0))
> &gt; vv&amp;nbsp;&amp;nbsp; FROM
> rt_ods.ods_applog_vidsplit&amp;nbsp;&amp;nbsp; where u_vid is
> &gt; not null and trim(u_vid)<&amp;amp;gt;''&amp;nbsp;&amp;nbsp;&amp;nbsp;
> and u_vid_group is
> &gt; not null and trim(u_vid_group) not in
> ('','-1')&amp;nbsp;&amp;nbsp;&amp;nbsp; and
> &gt; (&amp;nbsp; (concat(u_mod,'-',u_ac) in
> &gt; ('emptylog-video_display','emptylog-video_click')&amp;nbsp; and
> &gt; u_c_module='M011')&amp;nbsp; or&amp;nbsp;
> (concat(u_mod,'-',u_ac)='top-hits' and
> &gt; u_f_module='M011')&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> )&amp;nbsp;&amp;nbsp; group
> &gt; by&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; TUMBLE(bjdt, INTERVAL '5'
> &gt; MINUTE),&amp;nbsp;&amp;nbsp;&amp;nbsp; cast(u_vid as
> bigint),&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; u_vid_group&amp;nbsp; ) t1&amp;nbsp; left join&amp;nbsp;
> (&amp;nbsp;&amp;nbsp; --
> &gt; effectivevv&amp;nbsp;&amp;nbsp; select&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; begin_time,&amp;nbsp;&amp;nbsp;&amp;nbsp;
> u_vid,&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; u_vid_group,&amp;nbsp;&amp;nbsp;&amp;nbsp; count(1)
> effectivevv&amp;nbsp;&amp;nbsp;
> &gt; from&amp;nbsp;&amp;nbsp; (&amp;nbsp;&amp;nbsp;&amp;nbsp;
> select&amp;nbsp; begin_time,&amp;nbsp;
> &gt; u_vid,&amp;nbsp; u_vid_group,&amp;nbsp; u_diu,&amp;nbsp;
> u_playid,&amp;nbsp; m_pt,&amp;nbsp;
> &gt; q70&amp;nbsp;&amp;nbsp;&amp;nbsp; from&amp;nbsp;&amp;nbsp;&amp;nbsp;
> dw.video_pic_title_q70
> &gt; a&amp;nbsp;&amp;nbsp;&amp;nbsp; join&amp;nbsp;&amp;nbsp;&amp;nbsp;
> (&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; select&amp;nbsp;&amp;nbsp; CAST(TUMBLE_START(bjdt,INTERVAL '5'
> MINUTE) AS STRING)
> &gt; begin_time,&amp;nbsp; cast(u_vid as bigint) u_vid,&amp;nbsp;
> u_vid_group,&amp;nbsp;
> &gt; u_diu,&amp;nbsp; u_playid,&amp;nbsp; max(u_playtime)
> m_pt&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; FROM
> rt_ods.ods_applog_vidsplit&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; where
> u_vid is not
> &gt; null and trim(u_vid)<&amp;amp;gt;''&amp;nbsp; and u_vid_group is not
> null and
> &gt; trim(u_vid_group) not in ('','-1')&amp;nbsp; and
> &gt; concat(u_mod,'-',u_ac)='emptylog-video_play_speed'&amp;nbsp; and
> &gt; u_f_module='M011'&amp;nbsp; and
> u_playtime&amp;amp;gt;0&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; group by&amp;nbsp;&amp;nbsp; TUMBLE(bjdt, INTERVAL '5'
> MINUTE),&amp;nbsp; cast(u_vid as
> &gt; bigint),&amp;nbsp; u_vid_group,&amp;nbsp; u_diu,&amp;nbsp;
> u_playid&amp;nbsp;&amp;nbsp;&amp;nbsp; )
> &gt; b&amp;nbsp;&amp;nbsp;&amp;nbsp; on
> a.vid=b.u_vid&amp;nbsp;&amp;nbsp;&amp;nbsp; group by&amp;nbsp;&amp;nbsp;
> &gt; begin_time,&amp;nbsp; u_vid,&amp;nbsp; u_vid_group,&amp;nbsp;
> u_diu,&amp;nbsp;
> &gt; u_playid,&amp;nbsp; m_pt,&amp;nbsp; q70&amp;nbsp;&amp;nbsp; )
> temp&amp;nbsp;&amp;nbsp; where
> &gt; m_pt&amp;amp;gt;=q70&amp;nbsp;&amp;nbsp; group
> by&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
> &gt; begin_time,&amp;nbsp;&amp;nbsp;&amp;nbsp;
> u_vid,&amp;nbsp;&amp;nbsp;&amp;nbsp; u_vid_group&amp;nbsp; )
> &gt; t2&amp;nbsp; on t1.begin_time=t2.begin_time&amp;nbsp;&amp;nbsp; and
> &gt; t1.u_vid=t2.u_vid&amp;nbsp;&amp;nbsp; and
> t1.u_vid_group=t2.u_vid_group
> &gt; )t3&amp;nbsp;&amp;nbsp; group by begin_time,&amp;nbsp; vid,&amp;nbsp;
> vid_group ;



-- 
Best regards!
Rui Li

回复: flinksql流计算任务非正常结束

Posted by MuChen <93...@qq.com>.
看了配置文件,是流作业啊


$ grep -v \# sql-client-defaults.yaml |grep -v ^$ catalogs:    - name: myhive     type: hive     hive-conf-dir: /home/fsql/hive/conf     default-database: default execution:   planner: blink   type: streaming   time-characteristic: event-time   periodic-watermarks-interval: 200   result-mode: table   max-table-result-rows: 1000000   parallelism: 4   max-parallelism: 128   min-idle-state-retention: 0   max-idle-state-retention: 0   current-catalog: myhive   current-database: default   restart-strategy:     type: fixed-delay deployment:   response-timeout: 5000   gateway-address: ""   gateway-port: 0




------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"zhisheng"<zhisheng2018@gmail.com&gt;;
发送时间:&nbsp;2020年6月30日(星期二) 上午9:05
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: flinksql流计算任务非正常结束



是不是作业是一个批作业呀?

Yichao Yang <1048262223@qq.com&gt; 于2020年6月29日周一 下午6:58写道:

&gt; Hi
&gt;
&gt;
&gt; 看你的日志你的数据源是hive table?可以看下是否是批作业模式而不是流作业模式。
&gt;
&gt;
&gt; Best,
&gt; Yichao Yang
&gt;
&gt;
&gt;
&gt;
&gt; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&gt; 发件人:&amp;nbsp;"MuChen"<9329748@qq.com&amp;gt;;
&gt; 发送时间:&amp;nbsp;2020年6月29日(星期一) 下午4:53
&gt; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
&gt;
&gt; 主题:&amp;nbsp;flinksql流计算任务非正常结束
&gt;
&gt;
&gt;
&gt; hi,大家好:
&gt;
&gt; 我启动了yarn-session:bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm
&gt; fsql-cli&amp;amp;nbsp; 2&amp;amp;gt;&amp;amp;amp;1 &amp;amp;amp;
&gt;
&gt; 然后通过sql-client,提交了一个sql:
&gt;
&gt; 主要逻辑是将一个kafka表和一个hive维表做join,然后将聚合结果写到mysql中。&amp;amp;nbsp;
&gt;
&gt; 运行过程中,经常出现短则几个小时,长则几十个小时后,任务状态变为succeeded的情况,如图:
&gt; https://s1.ax1x.com/2020/06/29/Nf2dIA.png
&gt;
&gt; 日志中能看到INFO级别的异常,15:34任务结束时的日志如下:
&gt; 2020-06-29 14:53:20,260 INFO&amp;nbsp;
&gt; org.apache.flink.api.common.io.LocatableInputSplitAssigner&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; - Assigning remote split to host uhadoop-op3raf-core12 2020-06-29
&gt; 14:53:22,845 INFO&amp;nbsp;
&gt; org.apache.flink.runtime.executiongraph.ExecutionGraph&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; - Source: HiveTableSource(vid, q70) TablePath: dw.video_pic_title_q70,
&gt; PartitionPruned: false, PartitionNums: null (1/1) (68c24aa5
&gt; 9c898cefbb20fbc929ddbafd) switched from RUNNING to FINISHED. 2020-06-29
&gt; 15:34:52,982 INFO&amp;nbsp;
&gt; org.apache.flink.runtime.entrypoint.ClusterEntrypoint&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; - Shutting YarnSessionClusterEntrypoint down with application status
&gt; SUCCEEDED. Diagnostics null. 2020-06-29 15:34:52,984 INFO&amp;nbsp;
&gt; org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; - Shutting down rest endpoint. 2020-06-29 15:34:53,072 INFO&amp;nbsp;
&gt; org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; - Removing cache directory
&gt; /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui 2020-06-29
&gt; 15:34:53,073 INFO&amp;nbsp;
&gt; org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; - http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29
&gt; 15:34:53,074 INFO&amp;nbsp;
&gt; org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; - Shut down complete. 2020-06-29 15:34:53,074 INFO&amp;nbsp;
&gt; org.apache.flink.yarn.YarnResourceManager&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; - Shut down cluster because application is in SUCCEEDED, diagnostics null.
&gt; 2020-06-29 15:34:53,076 INFO&amp;nbsp;
&gt; org.apache.flink.yarn.YarnResourceManager&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; - Unregister application from the YARN Resource Manager with final status
&gt; SUCCEEDED. 2020-06-29 15:34:53,088 INFO&amp;nbsp;
&gt; org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; - Waiting for application to be successfully unregistered. 2020-06-29
&gt; 15:34:53,306 INFO&amp;nbsp;
&gt; org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent&amp;nbsp;
&gt; - Closing components. 2020-06-29 15:34:53,308 INFO&amp;nbsp;
&gt; org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess&amp;nbsp;
&gt; - Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309
&gt; INFO&amp;nbsp;
&gt; org.apache.flink.runtime.dispatcher.StandaloneDispatcher&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; - Stopping dispatcher akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher.
&gt; 2020-06-29 15:34:53,310 INFO&amp;nbsp;
&gt; org.apache.flink.runtime.dispatcher.StandaloneDispatcher&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; - Stopping all currently running jobs of dispatcher
&gt; akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29
&gt; 15:34:53,311 INFO&amp;nbsp;
&gt; org.apache.flink.runtime.jobmaster.JobMaster&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; - Stopping the JobMaster for job default: insert into
&gt; rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322 INFO&amp;nbsp;
&gt; org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl&amp;nbsp; -
&gt; Interrupted while waiting for queue
&gt; java.lang.InterruptedException&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; at
&gt; java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; at
&gt; java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; at
&gt; java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; at
&gt; org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287)
&gt; 2020-06-29 15:34:53,324 INFO&amp;nbsp;
&gt; org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy&amp;nbsp;
&gt; - Opening proxy : uhadoop-op3raf-core12:23333
&gt;
&gt; &amp;nbsp;
&gt; ps:&amp;amp;nbsp;
&gt;
&gt; 1. kafka中一直有数据在写入的
&gt; 2. flink版本1.10.0
&gt; 请问,任务状态为什么会变为SUCCEEDED呢?
&gt;
&gt; 谢谢大家!
&gt;
&gt;
&gt;
&gt;
&gt; 逻辑稍微有些复杂,可以忽略下面的sql代码:
&gt; #&amp;nbsp; -- 提供:5分钟起始时间、vid、vid_group、曝光次数、点击次数、播放次数、有效播放次数 --
&gt; 每5分钟将近5分钟统计结果写入mysql insert into rt_app.app_video_cover_abtest_test&amp;nbsp;
&gt; select&amp;nbsp; begin_time,&amp;nbsp; vid,&amp;nbsp; vid_group,&amp;nbsp; max(dv),&amp;nbsp;
&gt; max(click),&amp;nbsp; max(vv),&amp;nbsp; max(effectivevv) from(&amp;nbsp;
&gt; select&amp;nbsp;&amp;nbsp; t1.begin_time begin_time,&amp;nbsp;&amp;nbsp; t1.u_vid
&gt; vid,&amp;nbsp;&amp;nbsp; t1.u_vid_group vid_group,&amp;nbsp;&amp;nbsp; dv,&amp;nbsp;&amp;nbsp;
&gt; click,&amp;nbsp;&amp;nbsp; vv,&amp;nbsp;&amp;nbsp; if(effectivevv is null,0,effectivevv)
&gt; effectivevv&amp;nbsp; from&amp;nbsp; (&amp;nbsp;&amp;nbsp; -- dv、click、vv&amp;nbsp;&amp;nbsp;
&gt; select&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE)
&gt; AS STRING) begin_time,&amp;nbsp;&amp;nbsp;&amp;nbsp; cast(u_vid as bigint)
&gt; u_vid,&amp;nbsp;&amp;nbsp;&amp;nbsp; u_vid_group,&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; sum(if(concat(u_mod,'-',u_ac)='emptylog-video_display' and
&gt; u_c_module='M011',1,0)) dv,&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; sum(if(concat(u_mod,'-',u_ac)='emptylog-video_click' and
&gt; u_c_module='M011',1,0)) click,&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; sum(if(concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011',1,0))
&gt; vv&amp;nbsp;&amp;nbsp; FROM rt_ods.ods_applog_vidsplit&amp;nbsp;&amp;nbsp; where u_vid is
&gt; not null and trim(u_vid)<&amp;amp;gt;''&amp;nbsp;&amp;nbsp;&amp;nbsp; and u_vid_group is
&gt; not null and trim(u_vid_group) not in ('','-1')&amp;nbsp;&amp;nbsp;&amp;nbsp; and
&gt; (&amp;nbsp; (concat(u_mod,'-',u_ac) in
&gt; ('emptylog-video_display','emptylog-video_click')&amp;nbsp; and
&gt; u_c_module='M011')&amp;nbsp; or&amp;nbsp; (concat(u_mod,'-',u_ac)='top-hits' and
&gt; u_f_module='M011')&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; )&amp;nbsp;&amp;nbsp; group
&gt; by&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; TUMBLE(bjdt, INTERVAL '5'
&gt; MINUTE),&amp;nbsp;&amp;nbsp;&amp;nbsp; cast(u_vid as bigint),&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; u_vid_group&amp;nbsp; ) t1&amp;nbsp; left join&amp;nbsp; (&amp;nbsp;&amp;nbsp; --
&gt; effectivevv&amp;nbsp;&amp;nbsp; select&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; begin_time,&amp;nbsp;&amp;nbsp;&amp;nbsp; u_vid,&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; u_vid_group,&amp;nbsp;&amp;nbsp;&amp;nbsp; count(1) effectivevv&amp;nbsp;&amp;nbsp;
&gt; from&amp;nbsp;&amp;nbsp; (&amp;nbsp;&amp;nbsp;&amp;nbsp; select&amp;nbsp; begin_time,&amp;nbsp;
&gt; u_vid,&amp;nbsp; u_vid_group,&amp;nbsp; u_diu,&amp;nbsp; u_playid,&amp;nbsp; m_pt,&amp;nbsp;
&gt; q70&amp;nbsp;&amp;nbsp;&amp;nbsp; from&amp;nbsp;&amp;nbsp;&amp;nbsp; dw.video_pic_title_q70
&gt; a&amp;nbsp;&amp;nbsp;&amp;nbsp; join&amp;nbsp;&amp;nbsp;&amp;nbsp; (&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; select&amp;nbsp;&amp;nbsp; CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS STRING)
&gt; begin_time,&amp;nbsp; cast(u_vid as bigint) u_vid,&amp;nbsp; u_vid_group,&amp;nbsp;
&gt; u_diu,&amp;nbsp; u_playid,&amp;nbsp; max(u_playtime) m_pt&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; FROM rt_ods.ods_applog_vidsplit&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; where u_vid is not
&gt; null and trim(u_vid)<&amp;amp;gt;''&amp;nbsp; and u_vid_group is not null and
&gt; trim(u_vid_group) not in ('','-1')&amp;nbsp; and
&gt; concat(u_mod,'-',u_ac)='emptylog-video_play_speed'&amp;nbsp; and
&gt; u_f_module='M011'&amp;nbsp; and u_playtime&amp;amp;gt;0&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; group by&amp;nbsp;&amp;nbsp; TUMBLE(bjdt, INTERVAL '5' MINUTE),&amp;nbsp; cast(u_vid as
&gt; bigint),&amp;nbsp; u_vid_group,&amp;nbsp; u_diu,&amp;nbsp; u_playid&amp;nbsp;&amp;nbsp;&amp;nbsp; )
&gt; b&amp;nbsp;&amp;nbsp;&amp;nbsp; on a.vid=b.u_vid&amp;nbsp;&amp;nbsp;&amp;nbsp; group by&amp;nbsp;&amp;nbsp;
&gt; begin_time,&amp;nbsp; u_vid,&amp;nbsp; u_vid_group,&amp;nbsp; u_diu,&amp;nbsp;
&gt; u_playid,&amp;nbsp; m_pt,&amp;nbsp; q70&amp;nbsp;&amp;nbsp; ) temp&amp;nbsp;&amp;nbsp; where
&gt; m_pt&amp;amp;gt;=q70&amp;nbsp;&amp;nbsp; group by&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; begin_time,&amp;nbsp;&amp;nbsp;&amp;nbsp; u_vid,&amp;nbsp;&amp;nbsp;&amp;nbsp; u_vid_group&amp;nbsp; )
&gt; t2&amp;nbsp; on t1.begin_time=t2.begin_time&amp;nbsp;&amp;nbsp; and
&gt; t1.u_vid=t2.u_vid&amp;nbsp;&amp;nbsp; and t1.u_vid_group=t2.u_vid_group
&gt; )t3&amp;nbsp;&amp;nbsp; group by begin_time,&amp;nbsp; vid,&amp;nbsp; vid_group ;

Re: flinksql流计算任务非正常结束

Posted by zhisheng <zh...@gmail.com>.
是不是作业是一个批作业呀?

Yichao Yang <10...@qq.com> 于2020年6月29日周一 下午6:58写道:

> Hi
>
>
> 看你的日志你的数据源是hive table?可以看下是否是批作业模式而不是流作业模式。
>
>
> Best,
> Yichao Yang
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"MuChen"<9329748@qq.com&gt;;
> 发送时间:&nbsp;2020年6月29日(星期一) 下午4:53
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;flinksql流计算任务非正常结束
>
>
>
> hi,大家好:
>
> 我启动了yarn-session:bin/yarn-session.sh -jm 1g -tm 4g -s 4 -qu root.flink -nm
> fsql-cli&amp;nbsp; 2&amp;gt;&amp;amp;1 &amp;amp;
>
> 然后通过sql-client,提交了一个sql:
>
> 主要逻辑是将一个kafka表和一个hive维表做join,然后将聚合结果写到mysql中。&amp;nbsp;
>
> 运行过程中,经常出现短则几个小时,长则几十个小时后,任务状态变为succeeded的情况,如图:
> https://s1.ax1x.com/2020/06/29/Nf2dIA.png
>
> 日志中能看到INFO级别的异常,15:34任务结束时的日志如下:
> 2020-06-29 14:53:20,260 INFO&nbsp;
> org.apache.flink.api.common.io.LocatableInputSplitAssigner&nbsp;&nbsp;&nbsp;
> - Assigning remote split to host uhadoop-op3raf-core12 2020-06-29
> 14:53:22,845 INFO&nbsp;
> org.apache.flink.runtime.executiongraph.ExecutionGraph&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> - Source: HiveTableSource(vid, q70) TablePath: dw.video_pic_title_q70,
> PartitionPruned: false, PartitionNums: null (1/1) (68c24aa5
> 9c898cefbb20fbc929ddbafd) switched from RUNNING to FINISHED. 2020-06-29
> 15:34:52,982 INFO&nbsp;
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> - Shutting YarnSessionClusterEntrypoint down with application status
> SUCCEEDED. Diagnostics null. 2020-06-29 15:34:52,984 INFO&nbsp;
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&nbsp;&nbsp;&nbsp;
> - Shutting down rest endpoint. 2020-06-29 15:34:53,072 INFO&nbsp;
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&nbsp;&nbsp;&nbsp;
> - Removing cache directory
> /tmp/flink-web-cdb67193-05ee-4a83-b957-9b7a9d85c23f/flink-web-ui 2020-06-29
> 15:34:53,073 INFO&nbsp;
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&nbsp;&nbsp;&nbsp;
> - http://uhadoop-op3raf-core1:44664 lost leadership 2020-06-29
> 15:34:53,074 INFO&nbsp;
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint&nbsp;&nbsp;&nbsp;
> - Shut down complete. 2020-06-29 15:34:53,074 INFO&nbsp;
> org.apache.flink.yarn.YarnResourceManager&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> - Shut down cluster because application is in SUCCEEDED, diagnostics null.
> 2020-06-29 15:34:53,076 INFO&nbsp;
> org.apache.flink.yarn.YarnResourceManager&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> - Unregister application from the YARN Resource Manager with final status
> SUCCEEDED. 2020-06-29 15:34:53,088 INFO&nbsp;
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> - Waiting for application to be successfully unregistered. 2020-06-29
> 15:34:53,306 INFO&nbsp;
> org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent&nbsp;
> - Closing components. 2020-06-29 15:34:53,308 INFO&nbsp;
> org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess&nbsp;
> - Stopping SessionDispatcherLeaderProcess. 2020-06-29 15:34:53,309
> INFO&nbsp;
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> - Stopping dispatcher akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher.
> 2020-06-29 15:34:53,310 INFO&nbsp;
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> - Stopping all currently running jobs of dispatcher
> akka.tcp://flink@uhadoop-op3raf-core1:38817/user/dispatcher. 2020-06-29
> 15:34:53,311 INFO&nbsp;
> org.apache.flink.runtime.jobmaster.JobMaster&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> - Stopping the JobMaster for job default: insert into
> rt_app.app_video_cover_abtest_test ... 2020-06-29 15:34:53,322 INFO&nbsp;
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl&nbsp; -
> Interrupted while waiting for queue
> java.lang.InterruptedException&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
> at
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287)
> 2020-06-29 15:34:53,324 INFO&nbsp;
> org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy&nbsp;
> - Opening proxy : uhadoop-op3raf-core12:23333
>
> &nbsp;
> ps:&amp;nbsp;
>
> 1. kafka中一直有数据在写入的
> 2. flink版本1.10.0
> 请问,任务状态为什么会变为SUCCEEDED呢?
>
> 谢谢大家!
>
>
>
>
> 逻辑稍微有些复杂,可以忽略下面的sql代码:
> #&nbsp; -- 提供:5分钟起始时间、vid、vid_group、曝光次数、点击次数、播放次数、有效播放次数 --
> 每5分钟将近5分钟统计结果写入mysql insert into rt_app.app_video_cover_abtest_test&nbsp;
> select&nbsp; begin_time,&nbsp; vid,&nbsp; vid_group,&nbsp; max(dv),&nbsp;
> max(click),&nbsp; max(vv),&nbsp; max(effectivevv) from(&nbsp;
> select&nbsp;&nbsp; t1.begin_time begin_time,&nbsp;&nbsp; t1.u_vid
> vid,&nbsp;&nbsp; t1.u_vid_group vid_group,&nbsp;&nbsp; dv,&nbsp;&nbsp;
> click,&nbsp;&nbsp; vv,&nbsp;&nbsp; if(effectivevv is null,0,effectivevv)
> effectivevv&nbsp; from&nbsp; (&nbsp;&nbsp; -- dv、click、vv&nbsp;&nbsp;
> select&nbsp;&nbsp;&nbsp;&nbsp; CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE)
> AS STRING) begin_time,&nbsp;&nbsp;&nbsp; cast(u_vid as bigint)
> u_vid,&nbsp;&nbsp;&nbsp; u_vid_group,&nbsp;&nbsp;&nbsp;
> sum(if(concat(u_mod,'-',u_ac)='emptylog-video_display' and
> u_c_module='M011',1,0)) dv,&nbsp;&nbsp;&nbsp;
> sum(if(concat(u_mod,'-',u_ac)='emptylog-video_click' and
> u_c_module='M011',1,0)) click,&nbsp;&nbsp;&nbsp;
> sum(if(concat(u_mod,'-',u_ac)='top-hits' and u_f_module='M011',1,0))
> vv&nbsp;&nbsp; FROM rt_ods.ods_applog_vidsplit&nbsp;&nbsp; where u_vid is
> not null and trim(u_vid)<&amp;gt;''&nbsp;&nbsp;&nbsp; and u_vid_group is
> not null and trim(u_vid_group) not in ('','-1')&nbsp;&nbsp;&nbsp; and
> (&nbsp; (concat(u_mod,'-',u_ac) in
> ('emptylog-video_display','emptylog-video_click')&nbsp; and
> u_c_module='M011')&nbsp; or&nbsp; (concat(u_mod,'-',u_ac)='top-hits' and
> u_f_module='M011')&nbsp;&nbsp;&nbsp;&nbsp; )&nbsp;&nbsp; group
> by&nbsp;&nbsp;&nbsp;&nbsp; TUMBLE(bjdt, INTERVAL '5'
> MINUTE),&nbsp;&nbsp;&nbsp; cast(u_vid as bigint),&nbsp;&nbsp;&nbsp;
> u_vid_group&nbsp; ) t1&nbsp; left join&nbsp; (&nbsp;&nbsp; --
> effectivevv&nbsp;&nbsp; select&nbsp;&nbsp;&nbsp;
> begin_time,&nbsp;&nbsp;&nbsp; u_vid,&nbsp;&nbsp;&nbsp;
> u_vid_group,&nbsp;&nbsp;&nbsp; count(1) effectivevv&nbsp;&nbsp;
> from&nbsp;&nbsp; (&nbsp;&nbsp;&nbsp; select&nbsp; begin_time,&nbsp;
> u_vid,&nbsp; u_vid_group,&nbsp; u_diu,&nbsp; u_playid,&nbsp; m_pt,&nbsp;
> q70&nbsp;&nbsp;&nbsp; from&nbsp;&nbsp;&nbsp; dw.video_pic_title_q70
> a&nbsp;&nbsp;&nbsp; join&nbsp;&nbsp;&nbsp; (&nbsp;&nbsp;&nbsp;&nbsp;
> select&nbsp;&nbsp; CAST(TUMBLE_START(bjdt,INTERVAL '5' MINUTE) AS STRING)
> begin_time,&nbsp; cast(u_vid as bigint) u_vid,&nbsp; u_vid_group,&nbsp;
> u_diu,&nbsp; u_playid,&nbsp; max(u_playtime) m_pt&nbsp;&nbsp;&nbsp;&nbsp;
> FROM rt_ods.ods_applog_vidsplit&nbsp;&nbsp;&nbsp;&nbsp; where u_vid is not
> null and trim(u_vid)<&amp;gt;''&nbsp; and u_vid_group is not null and
> trim(u_vid_group) not in ('','-1')&nbsp; and
> concat(u_mod,'-',u_ac)='emptylog-video_play_speed'&nbsp; and
> u_f_module='M011'&nbsp; and u_playtime&amp;gt;0&nbsp;&nbsp;&nbsp;&nbsp;
> group by&nbsp;&nbsp; TUMBLE(bjdt, INTERVAL '5' MINUTE),&nbsp; cast(u_vid as
> bigint),&nbsp; u_vid_group,&nbsp; u_diu,&nbsp; u_playid&nbsp;&nbsp;&nbsp; )
> b&nbsp;&nbsp;&nbsp; on a.vid=b.u_vid&nbsp;&nbsp;&nbsp; group by&nbsp;&nbsp;
> begin_time,&nbsp; u_vid,&nbsp; u_vid_group,&nbsp; u_diu,&nbsp;
> u_playid,&nbsp; m_pt,&nbsp; q70&nbsp;&nbsp; ) temp&nbsp;&nbsp; where
> m_pt&amp;gt;=q70&nbsp;&nbsp; group by&nbsp;&nbsp;&nbsp;&nbsp;
> begin_time,&nbsp;&nbsp;&nbsp; u_vid,&nbsp;&nbsp;&nbsp; u_vid_group&nbsp; )
> t2&nbsp; on t1.begin_time=t2.begin_time&nbsp;&nbsp; and
> t1.u_vid=t2.u_vid&nbsp;&nbsp; and t1.u_vid_group=t2.u_vid_group
> )t3&nbsp;&nbsp; group by begin_time,&nbsp; vid,&nbsp; vid_group ;