You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by guanyq <dl...@163.com> on 2021/02/19 07:57:37 UTC

Flink1.12.0版本,Versioned Tables

求一个Flink1.12.0版本,Versioned Tables的demo。
CREATETABLEproducts(product_idSTRING,product_nameSTRING,priceDECIMAL(32,2),update_timeTIMESTAMP(3)METADATAFROM'value.source.timestamp'VIRTUAL,PRIMARYKEY(product_id)NOTENFORCEDWATERMARKFORupdate_timeASupdate_time)WITH(...);

Re: Flink Sql表概念

Posted by liwei li <hi...@gmail.com>.
*Temporal(dynamic) Table*: Temporal table is a table that evolves over time
as known as Flink *dynamic table
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/dynamic_tables.html#dynamic-tables--continuous-queries>*,
rows in temporal table are associated with one or more temporal periods,
all Flink tables are temporal(dynamic) table.

*Version:* A dynamic table can split into a set of versioned table
snapshots, the *version* in table snapshots represents the valid life
circle of rows, the start time and the end time of the valid period can be
assigned by users. Temporal table can split to versioned table and regular
table according to the table can tracks its history version or not.

*Versioned table*: If the row in dynamic table can track its history
changes and visit its history versions, we called this kind of
dynamic table as versioned table.

*Regular table**:* For regular table, the row in dynamic table can only
track its latest version. The table in lookup join can only track its
latest version it's also a regular table.


https://wiki.apache.org/confluence/display/FLINK/FLIP-132+Temporal+Table+DDL+and+Temporal+Table+Join


供参考。



悟空 <wu...@foxmail.com> 于2021年10月31日周日 上午10:09写道:

> 我的理解是时态表和版本表是一个类似概念 并且都是动态表的一种。最好还是看下flink 官网的解释更权威下
>
>
>
> 发自我的iPhone
>
>
> ------------------ 原始邮件 ------------------
> 发件人: guanyq <dlguanyq@163.com&gt;
> 发送时间: 2021年10月31日 09:24
> 收件人: user-zh <user-zh@flink.apache.org&gt;
> 主题: 回复:Flink Sql表概念
>
>
>
> 请大佬指导下:
> 动态表
> 时态表
> 版本表
> 这三个是什么关系?
> flinksql里面还有其他一些表的概念么?

回复:Flink Sql表概念

Posted by 悟空 <wu...@foxmail.com>.
我的理解是时态表和版本表是一个类似概念 并且都是动态表的一种。最好还是看下flink 官网的解释更权威下



发自我的iPhone


------------------ 原始邮件 ------------------
发件人: guanyq <dlguanyq@163.com&gt;
发送时间: 2021年10月31日 09:24
收件人: user-zh <user-zh@flink.apache.org&gt;
主题: 回复:Flink Sql表概念



请大佬指导下:
动态表
时态表
版本表
这三个是什么关系?
flinksql里面还有其他一些表的概念么?

Flink Sql表概念

Posted by guanyq <dl...@163.com>.
请大佬指导下:
动态表
时态表
版本表
这三个是什么关系?
flinksql里面还有其他一些表的概念么?

Flink1.14 table api GroupBy Window Aggregation

Posted by guanyq <dl...@163.com>.
请大佬指导下:
table api grooupbywindow,为什么输出是Empty set。


public class GroupByWindow {
public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

List<Order> lst = new ArrayList<>();
lst.add(new Order("a", "123", new Timestamp(Instant.now().toEpochMilli())));
lst.add(new Order("a", "124", new Timestamp(Instant.now().toEpochMilli())));
lst.add(new Order("a", "125", new Timestamp(Instant.now().toEpochMilli())));

DataStreamSource<Order> stream = env.fromCollection(lst);
// 声明一个额外的字段作为时间属性字段
Table table = tableEnv.fromDataStream(stream, $("productId"), $("fee"), $("acceptDate"), $("user_action_time").proctime());

tableEnv.createTemporaryView("People", table);
tableEnv.from("People").window(Tumble.over(lit(1).seconds()).on($("user_action_time")).as("w"))
                .groupBy($("productId"),$("w"))
                .select(
$("productId"),
$("productId").count().as("productCnt"),
$("w").start(),
$("w").end()
                ).execute().print();
}
}



Empty set

Process finished with exit code 0 





 





 





 

回复:Flink1.12 Streaming 消费kafka

Posted by JasonLee <17...@163.com>.
hi


可以使用 setPartitions 方法 
具体参考官网: https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#topic-partition-subscription


Best
JasonLee
在2021年11月8日 17:06,guanyq<dl...@163.com> 写道:
请大佬指导下:
flink streaming可以指定partition消费kafka么
如有100个partition,但是我只想消费15partiton。










Flink1.12 Streaming 消费kafka

Posted by guanyq <dl...@163.com>.
请大佬指导下:
flink streaming可以指定partition消费kafka么
如有100个partition,但是我只想消费15partiton。



 





 

Flink1.14 table api GroupBy Window Aggregation

Posted by guanyq <dl...@163.com>.
请大佬指导下:
table api grooupbywindow,为什么输出是Empty set。



 





 





 

Re:Flink1.14 redis connector

Posted by Yuepeng Pan <fl...@126.com>.
Hi,

   可以参阅这个仓库 https://github.com/apache/bahir-flink.git




Best,
Roc.








在 2021-12-20 22:49:42,"guanyq" <dl...@163.com> 写道:
>请问下:
>我记得之前的版本有redis connector,但是为什么现在版本的官网里面没有redis connector了
>
>
>
>
>
> 
>
>
>
>
>
> 
>
>
>
>
>
> 

Flink1.14 redis connector

Posted by guanyq <dl...@163.com>.
请问下:
我记得之前的版本有redis connector,但是为什么现在版本的官网里面没有redis connector了





 





 





 

Re:Flink1.14 table api & sql针对递增维度聚合 ttl是如何处理的

Posted by Michael Ran <gr...@163.com>.
你要的是 https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/config/ <br/>参数: table.exec.state.ttl<br/>这个吗?
在 2021-12-11 13:56:21,"guanyq" <dl...@163.com> 写道:
>请大佬指导下:
>需求: 通过flink sql 统计每天各个省份的订单受理量,显然这种维度统计时递增,如何设置ttl,只想让维度存储1周的数据。
>维度递增很可能会导致内存溢出,请教下flink sql ttl 配置在官网哪里有说明么。
>
>
>
> 
>
>
>
>
>
> 

Flink1.14 table api & sql针对递增维度聚合 ttl是如何处理的

Posted by guanyq <dl...@163.com>.
请大佬指导下:
需求: 通过flink sql 统计每天各个省份的订单受理量,显然这种维度统计时递增,如何设置ttl,只想让维度存储1周的数据。
维度递增很可能会导致内存溢出,请教下flink sql ttl 配置在官网哪里有说明么。



 





 

回复:Flink Sql读取Hbase表

Posted by "zstraw@163.com" <zs...@163.com>.
作为读的数据源时,使用的hbase的sdk scanner读取,不是全量读。见org.apache.flink.connector.hbase2.source.AbstractTableInputFormat#nextRecord。


作为维表时,使用Guava cache缓存每次join到的key。见org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction#eval。


Best Wishes!
- Yuan
在2021年11月7日 16:26,guanyq<dl...@163.com> 写道:
请大佬指导下:

-- 在 Flink SQL 中注册 HBase 表 "mytable"
CREATETABLEhTable(rowkeyINT,family1ROW<q1INT>,family2ROW<q2STRING,q3BIGINT>,family3ROW<q4DOUBLE,q5BOOLEAN,q6STRING>,PRIMARYKEY(rowkey)NOTENFORCED)WITH('connector'='hbase-1.4','table-name'='mytable','zookeeper.quorum'='localhost:2181');
Flink sql在读取hbase表时,是一次将数据加载到内存还是每次加载一批数据呀?
其实就是想知道,如果hbase表数据量特别大的时候,Flink sql是如何处理的?




Flink Sql读取Hbase表

Posted by guanyq <dl...@163.com>.
请大佬指导下:

-- 在 Flink SQL 中注册 HBase 表 "mytable"
CREATETABLEhTable(rowkeyINT,family1ROW<q1INT>,family2ROW<q2STRING,q3BIGINT>,family3ROW<q4DOUBLE,q5BOOLEAN,q6STRING>,PRIMARYKEY(rowkey)NOTENFORCED)WITH('connector'='hbase-1.4','table-name'='mytable','zookeeper.quorum'='localhost:2181');
Flink sql在读取hbase表时,是一次将数据加载到内存还是每次加载一批数据呀?
其实就是想知道,如果hbase表数据量特别大的时候,Flink sql是如何处理的?



 

回复:Flink on yarn的日志监控和checkpoint的监控生产是如何处理的?

Posted by JasonLee <17...@163.com>.
Hi


可以参考这两篇文章:
https://mp.weixin.qq.com/s/2S4M8p-rBRinIRxmZrZq5Q 
https://mp.weixin.qq.com/s/44SXmCAUOqSWhQrNiZftoQ


Best
JasonLee


在2021年08月31日 13:23,guanyq<dl...@163.com> 写道:
flink on yarn 在集群中启动很多的task,生产应用中是如何监控task的日志,和checkpoint的呢?


求大佬指导。

Flink on yarn的日志监控和checkpoint的监控生产是如何处理的?

Posted by guanyq <dl...@163.com>.
flink on yarn 在集群中启动很多的task,生产应用中是如何监控task的日志,和checkpoint的呢?


求大佬指导。

Re: flink 新特性

Posted by guozhi mang <ro...@gmail.com>.
hi,你可以访问这里 https://flink.apache.org/blog/
也可以访问 apache flink 公众号

guanyq <dl...@163.com> 于2022年11月24日周四 19:18写道:

> 请问flink每个版本的新特性在哪里有介绍.



-- 
Best regards

flink 新特性

Posted by guanyq <dl...@163.com>.
请问flink每个版本的新特性在哪里有介绍.

Re:flink1.14.0 temporal join hive

Posted by mack143 <ma...@163.com>.
退订
在 2022-03-05 09:46:46,"guanyq" <dl...@163.com> 写道:
>kafka实时流关联hive的最新分区表数据时,关于缓存刷新的问题
>
>
>'streaming-source.monitor-interval'='12 h'
>这个参数我理解是:按照启动开始时间算起,每12小时读取一下最新分区的数据是吧?
>还有个问题是读取最新分区的时间间隔之间,实时流里面进入了预关联新分区的数据,那么是不是就相当于关联的还是上一次的最新分区数据吧?
>
>
>SETtable.sql-dialect=hive;CREATETABLEdimension_table(product_idSTRING,product_nameSTRING,unit_priceDECIMAL(10,4),pv_countBIGINT,like_countBIGINT,comment_countBIGINT,update_timeTIMESTAMP(3),update_userSTRING,...)PARTITIONEDBY(pt_yearSTRING,pt_monthSTRING,pt_daySTRING)TBLPROPERTIES(-- using default partition-name order to load the latest partition every 12h (the most recommended and convenient way)
>'streaming-source.enable'='true','streaming-source.partition.include'='latest','streaming-source.monitor-interval'='12 h','streaming-source.partition-order'='partition-name',-- option with default value, can be ignored.
>
> 
>
>
>
>
> 
>
>
>
>
>
> 
>
>
>
>
>
> 
>
>
>
>
>
> 

flink1.14.0 temporal join hive

Posted by guanyq <dl...@163.com>.
kafka实时流关联hive的最新分区表数据时,关于缓存刷新的问题


'streaming-source.monitor-interval'='12 h'
这个参数我理解是:按照启动开始时间算起,每12小时读取一下最新分区的数据是吧?
还有个问题是读取最新分区的时间间隔之间,实时流里面进入了预关联新分区的数据,那么是不是就相当于关联的还是上一次的最新分区数据吧?


SETtable.sql-dialect=hive;CREATETABLEdimension_table(product_idSTRING,product_nameSTRING,unit_priceDECIMAL(10,4),pv_countBIGINT,like_countBIGINT,comment_countBIGINT,update_timeTIMESTAMP(3),update_userSTRING,...)PARTITIONEDBY(pt_yearSTRING,pt_monthSTRING,pt_daySTRING)TBLPROPERTIES(-- using default partition-name order to load the latest partition every 12h (the most recommended and convenient way)
'streaming-source.enable'='true','streaming-source.partition.include'='latest','streaming-source.monitor-interval'='12 h','streaming-source.partition-order'='partition-name',-- option with default value, can be ignored.

 




 





 





 





 

flink 1.14 读取Parquet

Posted by guanyq <dl...@163.com>.
请问下 FileSource 如何保证按照时间先后顺序读取指定文件夹下的Parquet文件。
串行按照文件名字上的时间顺序读取。

Flink1.12.0版本 FlinkStreaming 如何将偏移量提交到kafka 0.11

Posted by guanyq <dl...@163.com>.
flink 1.12版本
kafka版本0.11版本
目前可以消费,但是偏移量无法提交到kafka




我试过相同的代码,kafka版本2.4.1就可以提交偏移量到kafka
目前kafka 0.11版本有问题。无法提交。


有没有大佬帮忙想想办法。如何解决这个版本问题。

flink1.17.1版本 flink on yarn 提交无法获取配置文件

Posted by guanyq <dl...@163.com>.
/opt/flink/flink-1.17.1/bin/flink run-application -t yarn-application -yjm 1024m -ytm 1024m ./xx-1.0.jar ./config.properties以上提交命令制定的配置文件,为什么在容器内找配置文件?file /home/yarn/nm/usercache/root/appcache/application_1690773368385_0092/container_e183_1690773368385_0092_01_000001/./config.properties does not exist

flink1.10.1版本,读取日志目录问题

Posted by guanyq <dl...@163.com>.
请问下
flink如何监控并实时读取远程服务器的日志目录中所有日志文件内容

日志服务器(可以ssh连接,IP/用户名/密码)

Re:Re: flink1.16 sql gateway hive2

Posted by guanyq <dl...@163.com>.


感谢两位老师





在 2023-03-27 09:58:24,"Shengkai Fang" <fs...@gmail.com> 写道:
>方勇老师说的没错。我们在文档里面也加了如何配置 hiveserver2 endpoint 的文档[1]
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/hive-compatibility/hiveserver2/#setting-up
>
>Shammon FY <zj...@gmail.com> 于2023年3月27日周一 08:41写道:
>
>> Hi
>>
>>
>> 如果要启动hiveserver2协议的gateway,需要将jar包flink-connector-hive_${scala.binary.version}放入到gateway的lib目录
>>
>> Best,
>> Shammon FY
>>
>>
>> On Sun, Mar 26, 2023 at 12:07 PM guanyq <dl...@163.com> wrote:
>>
>> > 本地启动了flink及hive在启动sql gateway时有以下异常,请问还需要其他什么操作么
>> > ./bin/sql-gateway.sh start-foreground
>> > -Dsql-gateway.endpoint.type=hiveserver2
>> >
>> -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/usr/local/app/apache-hive-3.1.2-bin/conf
>> >
>> >
>> > 异常信息
>> >
>> > Available factory identifiers are:
>> > rest
>> > at
>> >
>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:545)
>> > ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
>> > at
>> >
>> org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(SqlGatewayEndpointFactoryUtils.java:65)
>> > ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
>> > at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:72)
>> > [flink-sql-gateway-1.16.0.jar:1.16.0]
>> > at
>> >
>> org.apache.flink.table.gateway.SqlGateway.startSqlGateway(SqlGateway.java:118)
>> > [flink-sql-gateway-1.16.0.jar:1.16.0]
>> > at org.apache.flink.table.gateway.SqlGateway.main(SqlGateway.java:98)
>> > [flink-sql-gateway-1.16.0.jar:1.16.0]
>> > Exception in thread "main"
>> > org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to
>> > start the endpoints.
>> > at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:79)
>> > at
>> >
>> org.apache.flink.table.gateway.SqlGateway.startSqlGateway(SqlGateway.java:118)
>> > at org.apache.flink.table.gateway.SqlGateway.main(SqlGateway.java:98)
>> > Caused by: org.apache.flink.table.api.ValidationException: Could not find
>> > any factory for identifier 'hiveserver2' that implements
>> > 'SqlGatewayEndpointFactory' in the classpath.
>> > Available factory identifiers are:
>> > rest
>> > at
>> >
>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:545)
>> > at
>> >
>> org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(SqlGatewayEndpointFactoryUtils.java:65)
>> > at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:72)
>> > ... 2 more
>> >
>> >
>>

flink1.17.1版本

Posted by guanyq <dl...@163.com>.
请问,flink sql 能否通过sql语句将mysql表加载为flink 内存表
sql语句为多表关联



Re: flink1.17.1版本 MiniCluster is not yet running or has already been shut down.

Posted by Shammon FY <zj...@gmail.com>.
Hi,

运行的是哪个例子?从错误上看是在从MiniCluster获取结果的时候,MiniCluster被关闭了

Best,
Shammon FY

On Sat, Jul 22, 2023 at 3:25 PM guanyq <dl...@163.com> wrote:

> 本地IDEA运行 MiniCluster is not yet running or has already been shut down.
> 请问是什么原因,如何处理
>
>
>
>
> 15:19:27,511 INFO
> org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl [] -
> Stopping resource manager service.
>
> 15:19:27,503 WARN
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] -
> Failed to get job status so we assume that the job has terminated. Some
> data might be lost.
>
> java.lang.IllegalStateException: MiniCluster is not yet running or has
> already been shut down.
>
> at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> ~[flink-core-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1060)
> ~[flink-runtime-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:933)
> ~[flink-runtime-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:857)
> ~[flink-runtime-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91)
> ~[flink-runtime-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:210)
> [flink-streaming-java-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118)
> [flink-streaming-java-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
> [flink-streaming-java-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
> [flink-streaming-java-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222)
> [flink-table-planner_25e35ab8-6377-4c6a-a928-a9fe1ff9e7f4.jar:1.17.1]
>
> at
> org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:120)
> [flink-table-common-1.17.1.jar:1.17.1]
>
> at
> org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153)
> [flink-table-api-java-1.17.1.jar:1.17.1]

flink1.17.1版本 MiniCluster is not yet running or has already been shut down.

Posted by guanyq <dl...@163.com>.
本地IDEA运行 MiniCluster is not yet running or has already been shut down. 请问是什么原因,如何处理




15:19:27,511 INFO  org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl [] - Stopping resource manager service.

15:19:27,503 WARN  org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - Failed to get job status so we assume that the job has terminated. Some data might be lost.

java.lang.IllegalStateException: MiniCluster is not yet running or has already been shut down.

at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[flink-core-1.17.1.jar:1.17.1]

at org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1060) ~[flink-runtime-1.17.1.jar:1.17.1]

at org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:933) ~[flink-runtime-1.17.1.jar:1.17.1]

at org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:857) ~[flink-runtime-1.17.1.jar:1.17.1]

at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91) ~[flink-runtime-1.17.1.jar:1.17.1]

at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.isJobTerminated(CollectResultFetcher.java:210) [flink-streaming-java-1.17.1.jar:1.17.1]

at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118) [flink-streaming-java-1.17.1.jar:1.17.1]

at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) [flink-streaming-java-1.17.1.jar:1.17.1]

at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) [flink-streaming-java-1.17.1.jar:1.17.1]

at org.apache.flink.table.planner.connectors.CollectDynamicSink$CloseableRowIteratorWrapper.hasNext(CollectDynamicSink.java:222) [flink-table-planner_25e35ab8-6377-4c6a-a928-a9fe1ff9e7f4.jar:1.17.1]

at org.apache.flink.table.utils.print.TableauStyle.print(TableauStyle.java:120) [flink-table-common-1.17.1.jar:1.17.1]

at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:153) [flink-table-api-java-1.17.1.jar:1.17.1]

Re: flink1.16 sql gateway hive2

Posted by Shengkai Fang <fs...@gmail.com>.
方勇老师说的没错。我们在文档里面也加了如何配置 hiveserver2 endpoint 的文档[1]

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/hive-compatibility/hiveserver2/#setting-up

Shammon FY <zj...@gmail.com> 于2023年3月27日周一 08:41写道:

> Hi
>
>
> 如果要启动hiveserver2协议的gateway,需要将jar包flink-connector-hive_${scala.binary.version}放入到gateway的lib目录
>
> Best,
> Shammon FY
>
>
> On Sun, Mar 26, 2023 at 12:07 PM guanyq <dl...@163.com> wrote:
>
> > 本地启动了flink及hive在启动sql gateway时有以下异常,请问还需要其他什么操作么
> > ./bin/sql-gateway.sh start-foreground
> > -Dsql-gateway.endpoint.type=hiveserver2
> >
> -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/usr/local/app/apache-hive-3.1.2-bin/conf
> >
> >
> > 异常信息
> >
> > Available factory identifiers are:
> > rest
> > at
> >
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:545)
> > ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
> > at
> >
> org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(SqlGatewayEndpointFactoryUtils.java:65)
> > ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
> > at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:72)
> > [flink-sql-gateway-1.16.0.jar:1.16.0]
> > at
> >
> org.apache.flink.table.gateway.SqlGateway.startSqlGateway(SqlGateway.java:118)
> > [flink-sql-gateway-1.16.0.jar:1.16.0]
> > at org.apache.flink.table.gateway.SqlGateway.main(SqlGateway.java:98)
> > [flink-sql-gateway-1.16.0.jar:1.16.0]
> > Exception in thread "main"
> > org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to
> > start the endpoints.
> > at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:79)
> > at
> >
> org.apache.flink.table.gateway.SqlGateway.startSqlGateway(SqlGateway.java:118)
> > at org.apache.flink.table.gateway.SqlGateway.main(SqlGateway.java:98)
> > Caused by: org.apache.flink.table.api.ValidationException: Could not find
> > any factory for identifier 'hiveserver2' that implements
> > 'SqlGatewayEndpointFactory' in the classpath.
> > Available factory identifiers are:
> > rest
> > at
> >
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:545)
> > at
> >
> org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(SqlGatewayEndpointFactoryUtils.java:65)
> > at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:72)
> > ... 2 more
> >
> >
>

Re: flink1.16 sql gateway hive2

Posted by Shammon FY <zj...@gmail.com>.
Hi

如果要启动hiveserver2协议的gateway,需要将jar包flink-connector-hive_${scala.binary.version}放入到gateway的lib目录

Best,
Shammon FY


On Sun, Mar 26, 2023 at 12:07 PM guanyq <dl...@163.com> wrote:

> 本地启动了flink及hive在启动sql gateway时有以下异常,请问还需要其他什么操作么
> ./bin/sql-gateway.sh start-foreground
> -Dsql-gateway.endpoint.type=hiveserver2
> -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/usr/local/app/apache-hive-3.1.2-bin/conf
>
>
> 异常信息
>
> Available factory identifiers are:
> rest
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:545)
> ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
> at
> org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(SqlGatewayEndpointFactoryUtils.java:65)
> ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
> at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:72)
> [flink-sql-gateway-1.16.0.jar:1.16.0]
> at
> org.apache.flink.table.gateway.SqlGateway.startSqlGateway(SqlGateway.java:118)
> [flink-sql-gateway-1.16.0.jar:1.16.0]
> at org.apache.flink.table.gateway.SqlGateway.main(SqlGateway.java:98)
> [flink-sql-gateway-1.16.0.jar:1.16.0]
> Exception in thread "main"
> org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to
> start the endpoints.
> at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:79)
> at
> org.apache.flink.table.gateway.SqlGateway.startSqlGateway(SqlGateway.java:118)
> at org.apache.flink.table.gateway.SqlGateway.main(SqlGateway.java:98)
> Caused by: org.apache.flink.table.api.ValidationException: Could not find
> any factory for identifier 'hiveserver2' that implements
> 'SqlGatewayEndpointFactory' in the classpath.
> Available factory identifiers are:
> rest
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:545)
> at
> org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(SqlGatewayEndpointFactoryUtils.java:65)
> at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:72)
> ... 2 more
>
>

flink1.16 sql gateway hive2

Posted by guanyq <dl...@163.com>.
本地启动了flink及hive在启动sql gateway时有以下异常,请问还需要其他什么操作么
./bin/sql-gateway.sh start-foreground -Dsql-gateway.endpoint.type=hiveserver2 -Dsql-gateway.endpoint.hiveserver2.catalog.hive-conf-dir=/usr/local/app/apache-hive-3.1.2-bin/conf


异常信息

Available factory identifiers are:
rest
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:545) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
at org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(SqlGatewayEndpointFactoryUtils.java:65) ~[flink-table-api-java-uber-1.16.0.jar:1.16.0]
at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:72) [flink-sql-gateway-1.16.0.jar:1.16.0]
at org.apache.flink.table.gateway.SqlGateway.startSqlGateway(SqlGateway.java:118) [flink-sql-gateway-1.16.0.jar:1.16.0]
at org.apache.flink.table.gateway.SqlGateway.main(SqlGateway.java:98) [flink-sql-gateway-1.16.0.jar:1.16.0]
Exception in thread "main" org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to start the endpoints.
at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:79)
at org.apache.flink.table.gateway.SqlGateway.startSqlGateway(SqlGateway.java:118)
at org.apache.flink.table.gateway.SqlGateway.main(SqlGateway.java:98)
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'hiveserver2' that implements 'SqlGatewayEndpointFactory' in the classpath.
Available factory identifiers are:
rest
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:545)
at org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.createSqlGatewayEndpoint(SqlGatewayEndpointFactoryUtils.java:65)
at org.apache.flink.table.gateway.SqlGateway.start(SqlGateway.java:72)
... 2 more


Re:Re: Re: Re: Re: flink on yarn 异常停电问题咨询

Posted by guanyq <dl...@163.com>.
我昨天模拟下断电的情况
10个ha文件的日期是错开的5秒一个
chk-xxx也不是都损坏了,有的是可以启动的,这个我也试了 现在情况是 yarn集群停电重启首先会循环尝试从10个ha的文件中启动应用,ha文件记录的chk的相关原数据 1.如果ha文件都损坏了,即使chk没有损坏,flink应用也是拉不起来的

现在想的是让hdfs上存在至少1组个可用的的ha文件及其对应的chk 现在是5秒一个chk,保存了10个,也会出现损坏无法启动的问题 5秒*10 = 50秒,也想知道多长时间的存档才能保证存在一组没有损坏ha和chk呢。














在 2023-03-14 10:16:48,"Guojun Li" <gj...@gmail.com> 写道:
>Hi
>
>确认一下这些 ha 文件的 last modification time 是一致的还是错开的?
>
>另外,指定 chk- 恢复尝试了没有?可以恢复吗?
>
>Best,
>Guojun
>
>On Fri, Mar 10, 2023 at 11:56 AM guanyq <dl...@163.com> wrote:
>
>> flink ha路径为 /tmp/flink/ha/
>> flink chk路径为 /tmp/flink/checkpoint
>>
>>
>> 我现在不确定是这个ha的文件损坏了,还是所有chk都损坏,但是这个需要模拟验证一下。
>>
>>
>>
>>
>> 会尝试从10个chk恢复,日志有打印
>> 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Recovering checkpoints from ZooKeeper.
>> 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Found 10 checkpoints in ZooKeeper.
>> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to fetch 10 checkpoints from storage.
>> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7079.
>> 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7080.
>> 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7081.
>> 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7082.
>> 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7083.
>> 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7084.
>> 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7085.
>> 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7086.
>>
>>
>>
>> 详细日志为,后面重复部分我给省略了,不同点就是尝试不同的/tmp/flink/ha/application_1678102326043_0007/completedCheckpointxxxxxx启动
>> 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl
>> - Starting the SlotManager.
>> 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.jobmaster.JobMaster -
>> Successfully ran initialization on master in 0 ms.
>> 2023-03-0718:37:43,660INFOorg.apache.flink.runtime.util.ZooKeeperUtils -
>> Initialized ZooKeeperCompletedCheckpointStore in
>> '/checkpoints/3844b96b002601d932e66233dd46899c'.
>> 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster -
>> Using application-defined state backend: File State Backend (checkpoints:
>> 'hdfs:/tmp/flink/checkpoint', savepoints: 'null', asynchronous: UNDEFINED,
>> fileStateThreshold: -1)
>> 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster -
>> Configuring application-defined state backend with job/cluster config
>> 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Recovering checkpoints from ZooKeeper.
>> 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Found 10 checkpoints in ZooKeeper.
>> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to fetch 10 checkpoints from storage.
>> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7079.
>> 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7080.
>> 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7081.
>> 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7082.
>> 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7083.
>> 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7084.
>> 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7085.
>> 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
>> - Trying to retrieve checkpoint 7086.
>> 2023-03-0718:37:43,979WARNorg.apache.hadoop.hdfs.BlockReaderFactory - I/O
>> error constructing remote block reader.
>> java.io.IOException: Got error, status message opReadBlock
>> BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278
>> received exception
>> org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
>> The meta file length 0 is less than the expected length 7, for
>> OP_READ_BLOCK, self=/192.168.200.23:45534, remote=/192.168.200.21:9866,
>> for file
>> /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a,
>> for pool BP-1003103929-192.168.200.11-1668473836936 block
>> 1301252639_227512278
>> at
>> org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
>> at
>> org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
>> at
>> org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
>> at
>> org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReader(BlockReaderFactory.java:818)
>> at
>> org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:697)
>> at
>> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:355)
>> at
>> org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:665)
>> at
>> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:874)
>> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:926)
>> at java.io.DataInputStream.read(DataInputStream.java:149)
>> at
>> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
>> at
>> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2663)
>> at
>> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2679)
>> at
>> java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3156)
>> at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:862)
>> at java.io.ObjectInputStream.<init>(ObjectInputStream.java:358)
>> at
>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:69)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:572)
>> at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:555)
>> at
>> org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
>> at
>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.retrieveCompletedCheckpoint(ZooKeeperCompletedCheckpointStore.java:339)
>> at
>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.recover(ZooKeeperCompletedCheckpointStore.java:175)
>> at
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1070)
>> at
>> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:234)
>> at
>> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:216)
>> at
>> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120)
>> at
>> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
>> at
>> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
>> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
>> at
>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>> at
>> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>> at
>> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146)
>> at
>> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:381)
>> at
>> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>> at
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2023-03-10 11:00:58,"Weihua Hu" <hu...@gmail.com> 写道:
>> >Hi
>> >
>> >一般来说只是 YARN 集群异常停电不会影响已经完成的历史 Checkpoint(最后一次 Checkpoint 可能会写 hdfs 异常)
>> >
>> >有更详细的 JobManager 日志吗?可以先确认下 Flink 在恢复时检索到了多少个 completedCheckpoint
>> >以及最终尝试从哪一次 cp 恢复的。
>> >
>> >也可以尝试按照 Yanfei 所说指定历史的 cp 作为 savepoint 恢复
>> >
>> >
>> >Best,
>> >Weihua
>> >
>> >
>> >On Fri, Mar 10, 2023 at 10:38 AM guanyq <dl...@163.com> wrote:
>> >
>> >> 没有开启增量chk
>> >> 文件损坏是看了启动日志,启动日志尝试从10个chk启动,但是都因为以下块损坏启动失败了
>> >> 错误日志为:
>> >>
>> >> java.io.IOException: Got error, status message opReadBlock
>> >> BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278
>> >> received exception
>> >> org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
>> >> The meta file length 0 is less than the expected length 7, for
>> >> OP_READ_BLOCK, self=/ip:45534, remote=/ip:9866,
>> >> for file
>> >>
>> /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a,
>> >> for pool BP-1003103929-192.168.200.11-1668473836936 block
>> >> 1301252639_227512278
>> >> at
>> >>
>> org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
>> >> at
>> >>
>> org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
>> >> at
>> >>
>> org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2023-03-10 10:26:11,"Yanfei Lei" <fr...@gmail.com> 写道:
>> >> >Hi 可以尝试去flink配置的checkpoint dir下面去找一找历史chk-x文件夹,如果能找到历史的chk-x,可以尝试手工指定
>> >> chk重启[1]。
>> >> >
>> >> >> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
>> >>
>> >>
>> >请问作业开启增量checkpoint了吗?在开启了增量的情况下,如果是比较早的一个checkpoint的文件损坏了,会影响后续基于它进行增量的checkpoint。
>> >> >
>> >> >> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
>> >> >是观察到checkpoint dir下面没有文件吗?
>> >> >
>> >> >[1]
>> >>
>> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints
>> >> >
>> >> >guanyq <dl...@163.com> 于2023年3月10日周五 08:58写道:
>> >> >>
>> >> >> 目前也想着用savepoint处理异常停电的问题
>> >> >> 但是我这面还有个疑问:
>> >> >> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
>> >> >> 就很奇怪是不是10个checkpoint都没落盘导致的。
>> >> >> 想问下:
>> >> >> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >> >> 在 2023-03-10 08:47:11,"Shammon FY" <zj...@gmail.com> 写道:
>> >> >> >Hi
>> >> >> >
>> >> >> >我觉得Flink
>> >> >>
>> >>
>> >作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
>> >> >> >
>> >> >> >Best,
>> >> >> >Shammon
>> >> >> >
>> >> >> >On Thu, Mar 9, 2023 at 10:06 PM guanyq <dl...@163.com> wrote:
>> >> >> >
>> >> >> >> 前提
>> >> >> >> 1.flink配置了高可用
>> >> >> >> 2.flink配置checkpoint数为10
>> >> >> >> 3.yarn集群配置了任务恢复
>> >> >> >> 疑问
>> >> >> >>
>> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >> >>
>> >> >
>> >> >
>> >> >
>> >> >--
>> >> >Best,
>> >> >Yanfei
>> >>
>>

Re: Re: Re: Re: flink on yarn 异常停电问题咨询

Posted by Guojun Li <gj...@gmail.com>.
Hi

确认一下这些 ha 文件的 last modification time 是一致的还是错开的?

另外,指定 chk- 恢复尝试了没有?可以恢复吗?

Best,
Guojun

On Fri, Mar 10, 2023 at 11:56 AM guanyq <dl...@163.com> wrote:

> flink ha路径为 /tmp/flink/ha/
> flink chk路径为 /tmp/flink/checkpoint
>
>
> 我现在不确定是这个ha的文件损坏了,还是所有chk都损坏,但是这个需要模拟验证一下。
>
>
>
>
> 会尝试从10个chk恢复,日志有打印
> 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Recovering checkpoints from ZooKeeper.
> 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Found 10 checkpoints in ZooKeeper.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to fetch 10 checkpoints from storage.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7079.
> 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7080.
> 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7081.
> 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7082.
> 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7083.
> 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7084.
> 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7085.
> 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7086.
>
>
>
> 详细日志为,后面重复部分我给省略了,不同点就是尝试不同的/tmp/flink/ha/application_1678102326043_0007/completedCheckpointxxxxxx启动
> 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl
> - Starting the SlotManager.
> 2023-03-0718:37:43,621INFOorg.apache.flink.runtime.jobmaster.JobMaster -
> Successfully ran initialization on master in 0 ms.
> 2023-03-0718:37:43,660INFOorg.apache.flink.runtime.util.ZooKeeperUtils -
> Initialized ZooKeeperCompletedCheckpointStore in
> '/checkpoints/3844b96b002601d932e66233dd46899c'.
> 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster -
> Using application-defined state backend: File State Backend (checkpoints:
> 'hdfs:/tmp/flink/checkpoint', savepoints: 'null', asynchronous: UNDEFINED,
> fileStateThreshold: -1)
> 2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster -
> Configuring application-defined state backend with job/cluster config
> 2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Recovering checkpoints from ZooKeeper.
> 2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Found 10 checkpoints in ZooKeeper.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to fetch 10 checkpoints from storage.
> 2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7079.
> 2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7080.
> 2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7081.
> 2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7082.
> 2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7083.
> 2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7084.
> 2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7085.
> 2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore
> - Trying to retrieve checkpoint 7086.
> 2023-03-0718:37:43,979WARNorg.apache.hadoop.hdfs.BlockReaderFactory - I/O
> error constructing remote block reader.
> java.io.IOException: Got error, status message opReadBlock
> BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278
> received exception
> org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
> The meta file length 0 is less than the expected length 7, for
> OP_READ_BLOCK, self=/192.168.200.23:45534, remote=/192.168.200.21:9866,
> for file
> /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a,
> for pool BP-1003103929-192.168.200.11-1668473836936 block
> 1301252639_227512278
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
> at
> org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
> at
> org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
> at
> org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReader(BlockReaderFactory.java:818)
> at
> org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:697)
> at
> org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:355)
> at
> org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:665)
> at
> org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:874)
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:926)
> at java.io.DataInputStream.read(DataInputStream.java:149)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
> at
> java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2663)
> at
> java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2679)
> at
> java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3156)
> at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:862)
> at java.io.ObjectInputStream.<init>(ObjectInputStream.java:358)
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:69)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:572)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:555)
> at
> org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
> at
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.retrieveCompletedCheckpoint(ZooKeeperCompletedCheckpointStore.java:339)
> at
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.recover(ZooKeeperCompletedCheckpointStore.java:175)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1070)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:234)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:216)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120)
> at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
> at
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146)
> at
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:381)
> at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-10 11:00:58,"Weihua Hu" <hu...@gmail.com> 写道:
> >Hi
> >
> >一般来说只是 YARN 集群异常停电不会影响已经完成的历史 Checkpoint(最后一次 Checkpoint 可能会写 hdfs 异常)
> >
> >有更详细的 JobManager 日志吗?可以先确认下 Flink 在恢复时检索到了多少个 completedCheckpoint
> >以及最终尝试从哪一次 cp 恢复的。
> >
> >也可以尝试按照 Yanfei 所说指定历史的 cp 作为 savepoint 恢复
> >
> >
> >Best,
> >Weihua
> >
> >
> >On Fri, Mar 10, 2023 at 10:38 AM guanyq <dl...@163.com> wrote:
> >
> >> 没有开启增量chk
> >> 文件损坏是看了启动日志,启动日志尝试从10个chk启动,但是都因为以下块损坏启动失败了
> >> 错误日志为:
> >>
> >> java.io.IOException: Got error, status message opReadBlock
> >> BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278
> >> received exception
> >> org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
> >> The meta file length 0 is less than the expected length 7, for
> >> OP_READ_BLOCK, self=/ip:45534, remote=/ip:9866,
> >> for file
> >>
> /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a,
> >> for pool BP-1003103929-192.168.200.11-1668473836936 block
> >> 1301252639_227512278
> >> at
> >>
> org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
> >> at
> >>
> org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
> >> at
> >>
> org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2023-03-10 10:26:11,"Yanfei Lei" <fr...@gmail.com> 写道:
> >> >Hi 可以尝试去flink配置的checkpoint dir下面去找一找历史chk-x文件夹,如果能找到历史的chk-x,可以尝试手工指定
> >> chk重启[1]。
> >> >
> >> >> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
> >>
> >>
> >请问作业开启增量checkpoint了吗?在开启了增量的情况下,如果是比较早的一个checkpoint的文件损坏了,会影响后续基于它进行增量的checkpoint。
> >> >
> >> >> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
> >> >是观察到checkpoint dir下面没有文件吗?
> >> >
> >> >[1]
> >>
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints
> >> >
> >> >guanyq <dl...@163.com> 于2023年3月10日周五 08:58写道:
> >> >>
> >> >> 目前也想着用savepoint处理异常停电的问题
> >> >> 但是我这面还有个疑问:
> >> >> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
> >> >> 就很奇怪是不是10个checkpoint都没落盘导致的。
> >> >> 想问下:
> >> >> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 在 2023-03-10 08:47:11,"Shammon FY" <zj...@gmail.com> 写道:
> >> >> >Hi
> >> >> >
> >> >> >我觉得Flink
> >> >>
> >>
> >作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
> >> >> >
> >> >> >Best,
> >> >> >Shammon
> >> >> >
> >> >> >On Thu, Mar 9, 2023 at 10:06 PM guanyq <dl...@163.com> wrote:
> >> >> >
> >> >> >> 前提
> >> >> >> 1.flink配置了高可用
> >> >> >> 2.flink配置checkpoint数为10
> >> >> >> 3.yarn集群配置了任务恢复
> >> >> >> 疑问
> >> >> >>
> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
> >> >> >>
> >> >> >>
> >> >> >>
> >> >> >>
> >> >
> >> >
> >> >
> >> >--
> >> >Best,
> >> >Yanfei
> >>
>

Re:Re: Re: Re: flink on yarn 异常停电问题咨询

Posted by guanyq <dl...@163.com>.
flink ha路径为 /tmp/flink/ha/
flink chk路径为 /tmp/flink/checkpoint


我现在不确定是这个ha的文件损坏了,还是所有chk都损坏,但是这个需要模拟验证一下。




会尝试从10个chk恢复,日志有打印
2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Recovering checkpoints from ZooKeeper.
2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 10 checkpoints in ZooKeeper.
2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to fetch 10 checkpoints from storage.
2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7079.
2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7080.
2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7081.
2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7082.
2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7083.
2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7084.
2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7085.
2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7086.


详细日志为,后面重复部分我给省略了,不同点就是尝试不同的/tmp/flink/ha/application_1678102326043_0007/completedCheckpointxxxxxx启动
2023-03-0718:37:43,621INFOorg.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl - Starting the SlotManager.
2023-03-0718:37:43,621INFOorg.apache.flink.runtime.jobmaster.JobMaster - Successfully ran initialization on master in 0 ms.
2023-03-0718:37:43,660INFOorg.apache.flink.runtime.util.ZooKeeperUtils - Initialized ZooKeeperCompletedCheckpointStore in '/checkpoints/3844b96b002601d932e66233dd46899c'.
2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster - Using application-defined state backend: File State Backend (checkpoints: 'hdfs:/tmp/flink/checkpoint', savepoints: 'null', asynchronous: UNDEFINED, fileStateThreshold: -1)
2023-03-0718:37:43,680INFOorg.apache.flink.runtime.jobmaster.JobMaster - Configuring application-defined state backend with job/cluster config
2023-03-0718:37:43,703INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Recovering checkpoints from ZooKeeper.
2023-03-0718:37:43,730INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Found 10 checkpoints in ZooKeeper.
2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to fetch 10 checkpoints from storage.
2023-03-0718:37:43,731INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7079.
2023-03-0718:37:43,837INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7080.
2023-03-0718:37:43,868INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7081.
2023-03-0718:37:43,896INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7082.
2023-03-0718:37:43,906INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7083.
2023-03-0718:37:43,928INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7084.
2023-03-0718:37:43,936INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7085.
2023-03-0718:37:43,947INFOorg.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - Trying to retrieve checkpoint 7086.
2023-03-0718:37:43,979WARNorg.apache.hadoop.hdfs.BlockReaderFactory - I/O error constructing remote block reader.
java.io.IOException: Got error, status message opReadBlock BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278 received exception
org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
The meta file length 0 is less than the expected length 7, for OP_READ_BLOCK, self=/192.168.200.23:45534, remote=/192.168.200.21:9866,
for file /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a, for pool BP-1003103929-192.168.200.11-1668473836936 block 1301252639_227512278
at org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
at org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
at org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReader(BlockReaderFactory.java:818)
at org.apache.hadoop.hdfs.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:697)
at org.apache.hadoop.hdfs.BlockReaderFactory.build(BlockReaderFactory.java:355)
at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:665)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:874)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:926)
at java.io.DataInputStream.read(DataInputStream.java:149)
at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
at java.io.ObjectInputStream$PeekInputStream.read(ObjectInputStream.java:2663)
at java.io.ObjectInputStream$PeekInputStream.readFully(ObjectInputStream.java:2679)
at java.io.ObjectInputStream$BlockDataInputStream.readShort(ObjectInputStream.java:3156)
at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:862)
at java.io.ObjectInputStream.<init>(ObjectInputStream.java:358)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.<init>(InstantiationUtil.java:69)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:572)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:555)
at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58)
at org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.retrieveCompletedCheckpoint(ZooKeeperCompletedCheckpointStore.java:339)
at org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore.recover(ZooKeeperCompletedCheckpointStore.java:175)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedState(CheckpointCoordinator.java:1070)
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:234)
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:216)
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120)
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146)
at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:381)
at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

















在 2023-03-10 11:00:58,"Weihua Hu" <hu...@gmail.com> 写道:
>Hi
>
>一般来说只是 YARN 集群异常停电不会影响已经完成的历史 Checkpoint(最后一次 Checkpoint 可能会写 hdfs 异常)
>
>有更详细的 JobManager 日志吗?可以先确认下 Flink 在恢复时检索到了多少个 completedCheckpoint
>以及最终尝试从哪一次 cp 恢复的。
>
>也可以尝试按照 Yanfei 所说指定历史的 cp 作为 savepoint 恢复
>
>
>Best,
>Weihua
>
>
>On Fri, Mar 10, 2023 at 10:38 AM guanyq <dl...@163.com> wrote:
>
>> 没有开启增量chk
>> 文件损坏是看了启动日志,启动日志尝试从10个chk启动,但是都因为以下块损坏启动失败了
>> 错误日志为:
>>
>> java.io.IOException: Got error, status message opReadBlock
>> BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278
>> received exception
>> org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
>> The meta file length 0 is less than the expected length 7, for
>> OP_READ_BLOCK, self=/ip:45534, remote=/ip:9866,
>> for file
>> /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a,
>> for pool BP-1003103929-192.168.200.11-1668473836936 block
>> 1301252639_227512278
>> at
>> org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
>> at
>> org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
>> at
>> org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2023-03-10 10:26:11,"Yanfei Lei" <fr...@gmail.com> 写道:
>> >Hi 可以尝试去flink配置的checkpoint dir下面去找一找历史chk-x文件夹,如果能找到历史的chk-x,可以尝试手工指定
>> chk重启[1]。
>> >
>> >> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
>>
>> >请问作业开启增量checkpoint了吗?在开启了增量的情况下,如果是比较早的一个checkpoint的文件损坏了,会影响后续基于它进行增量的checkpoint。
>> >
>> >> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
>> >是观察到checkpoint dir下面没有文件吗?
>> >
>> >[1]
>> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints
>> >
>> >guanyq <dl...@163.com> 于2023年3月10日周五 08:58写道:
>> >>
>> >> 目前也想着用savepoint处理异常停电的问题
>> >> 但是我这面还有个疑问:
>> >> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
>> >> 就很奇怪是不是10个checkpoint都没落盘导致的。
>> >> 想问下:
>> >> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2023-03-10 08:47:11,"Shammon FY" <zj...@gmail.com> 写道:
>> >> >Hi
>> >> >
>> >> >我觉得Flink
>> >>
>> >作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
>> >> >
>> >> >Best,
>> >> >Shammon
>> >> >
>> >> >On Thu, Mar 9, 2023 at 10:06 PM guanyq <dl...@163.com> wrote:
>> >> >
>> >> >> 前提
>> >> >> 1.flink配置了高可用
>> >> >> 2.flink配置checkpoint数为10
>> >> >> 3.yarn集群配置了任务恢复
>> >> >> 疑问
>> >> >> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
>> >> >>
>> >> >>
>> >> >>
>> >> >>
>> >
>> >
>> >
>> >--
>> >Best,
>> >Yanfei
>>

Re:Re: Re: flink on yarn关于yarn尝试重启flink job问题咨询

Posted by guanyq <dl...@163.com>.


理解了,非常感谢。








在 2023-03-13 16:57:18,"Weihua Hu" <hu...@gmail.com> 写道:
>图片看不到,可以找一个图床上传图片,在邮件列表中贴一下链接。
>
>YARN 拉起 AM 还受 "yarn.application-attempt-failures-validity-interval"[1]
>控制,在这个时间内达到指定次数才会退出。
>
>[1]
>https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#yarn-application-attempt-failures-validity-interval
>
>Best,
>Weihua
>
>
>On Mon, Mar 13, 2023 at 4:27 PM guanyq <dl...@163.com> wrote:
>
>> 图片在附件
>> 但是实际却是超过了10次。。
>>
>>
>>
>>
>>
>>
>> 在 2023-03-13 15:39:39,"Weihua Hu" <hu...@gmail.com> 写道:
>> >Hi,
>> >
>> >图片看不到了
>> >
>> >按照这个配置,YARN 应该只会拉起 10 次 JobManager。
>> >
>> >Best,
>> >Weihua
>> >
>> >
>> >On Mon, Mar 13, 2023 at 3:32 PM guanyq <dl...@163.com> wrote:
>> >
>> >> flink1.10版本,flink配置如下
>> >> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
>> >> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
>> >> 请问appattempt_1678102326043_0006_000409
>> >> <http://192.168.63.12:8088/cluster/appattempt/appattempt_1678102326043_0006_000409>
>> >> 每个序号不是代表一次尝试么
>> >>
>>
>>

Re: Re: flink on yarn关于yarn尝试重启flink job问题咨询

Posted by Weihua Hu <hu...@gmail.com>.
图片看不到,可以找一个图床上传图片,在邮件列表中贴一下链接。

YARN 拉起 AM 还受 "yarn.application-attempt-failures-validity-interval"[1]
控制,在这个时间内达到指定次数才会退出。

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#yarn-application-attempt-failures-validity-interval

Best,
Weihua


On Mon, Mar 13, 2023 at 4:27 PM guanyq <dl...@163.com> wrote:

> 图片在附件
> 但是实际却是超过了10次。。
>
>
>
>
>
>
> 在 2023-03-13 15:39:39,"Weihua Hu" <hu...@gmail.com> 写道:
> >Hi,
> >
> >图片看不到了
> >
> >按照这个配置,YARN 应该只会拉起 10 次 JobManager。
> >
> >Best,
> >Weihua
> >
> >
> >On Mon, Mar 13, 2023 at 3:32 PM guanyq <dl...@163.com> wrote:
> >
> >> flink1.10版本,flink配置如下
> >> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
> >> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
> >> 请问appattempt_1678102326043_0006_000409
> >> <http://192.168.63.12:8088/cluster/appattempt/appattempt_1678102326043_0006_000409>
> >> 每个序号不是代表一次尝试么
> >>
>
>

Re:Re: flink on yarn关于yarn尝试重启flink job问题咨询

Posted by guanyq <dl...@163.com>.
图片在附件
但是实际却是超过了10次。。
















在 2023-03-13 15:39:39,"Weihua Hu" <hu...@gmail.com> 写道:
>Hi,
>
>图片看不到了
>
>按照这个配置,YARN 应该只会拉起 10 次 JobManager。
>
>Best,
>Weihua
>
>
>On Mon, Mar 13, 2023 at 3:32 PM guanyq <dl...@163.com> wrote:
>
>> flink1.10版本,flink配置如下
>> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
>> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
>> 请问appattempt_1678102326043_0006_000409
>> <http://192.168.63.12:8088/cluster/appattempt/appattempt_1678102326043_0006_000409>
>> 每个序号不是代表一次尝试么
>>

Re: flink on yarn关于yarn尝试重启flink job问题咨询

Posted by Weihua Hu <hu...@gmail.com>.
Hi,

图片看不到了

按照这个配置,YARN 应该只会拉起 10 次 JobManager。

Best,
Weihua


On Mon, Mar 13, 2023 at 3:32 PM guanyq <dl...@163.com> wrote:

> flink1.10版本,flink配置如下
> yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
> 正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
> 请问appattempt_1678102326043_0006_000409
> <http://192.168.63.12:8088/cluster/appattempt/appattempt_1678102326043_0006_000409>
> 每个序号不是代表一次尝试么
>

flink on yarn关于yarn尝试重启flink job问题咨询

Posted by guanyq <dl...@163.com>.
flink1.10版本,flink配置如下
yarn.application-attempts = 10  (yarn尝试启动flink job的次数为10)
正常我理解yarn会尝试10次启动flink job,如果起不来应该就会失败,但是在yarn应用页面看到了尝试11次,如下图
请问appattempt_1678102326043_0006_000409每个序号不是代表一次尝试么

Re: Re: Re: flink on yarn 异常停电问题咨询

Posted by Weihua Hu <hu...@gmail.com>.
Hi

一般来说只是 YARN 集群异常停电不会影响已经完成的历史 Checkpoint(最后一次 Checkpoint 可能会写 hdfs 异常)

有更详细的 JobManager 日志吗?可以先确认下 Flink 在恢复时检索到了多少个 completedCheckpoint
以及最终尝试从哪一次 cp 恢复的。

也可以尝试按照 Yanfei 所说指定历史的 cp 作为 savepoint 恢复


Best,
Weihua


On Fri, Mar 10, 2023 at 10:38 AM guanyq <dl...@163.com> wrote:

> 没有开启增量chk
> 文件损坏是看了启动日志,启动日志尝试从10个chk启动,但是都因为以下块损坏启动失败了
> 错误日志为:
>
> java.io.IOException: Got error, status message opReadBlock
> BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278
> received exception
> org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
> The meta file length 0 is less than the expected length 7, for
> OP_READ_BLOCK, self=/ip:45534, remote=/ip:9866,
> for file
> /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a,
> for pool BP-1003103929-192.168.200.11-1668473836936 block
> 1301252639_227512278
> at
> org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
> at
> org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
> at
> org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-10 10:26:11,"Yanfei Lei" <fr...@gmail.com> 写道:
> >Hi 可以尝试去flink配置的checkpoint dir下面去找一找历史chk-x文件夹,如果能找到历史的chk-x,可以尝试手工指定
> chk重启[1]。
> >
> >> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
>
> >请问作业开启增量checkpoint了吗?在开启了增量的情况下,如果是比较早的一个checkpoint的文件损坏了,会影响后续基于它进行增量的checkpoint。
> >
> >> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
> >是观察到checkpoint dir下面没有文件吗?
> >
> >[1]
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints
> >
> >guanyq <dl...@163.com> 于2023年3月10日周五 08:58写道:
> >>
> >> 目前也想着用savepoint处理异常停电的问题
> >> 但是我这面还有个疑问:
> >> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
> >> 就很奇怪是不是10个checkpoint都没落盘导致的。
> >> 想问下:
> >> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2023-03-10 08:47:11,"Shammon FY" <zj...@gmail.com> 写道:
> >> >Hi
> >> >
> >> >我觉得Flink
> >>
> >作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
> >> >
> >> >Best,
> >> >Shammon
> >> >
> >> >On Thu, Mar 9, 2023 at 10:06 PM guanyq <dl...@163.com> wrote:
> >> >
> >> >> 前提
> >> >> 1.flink配置了高可用
> >> >> 2.flink配置checkpoint数为10
> >> >> 3.yarn集群配置了任务恢复
> >> >> 疑问
> >> >> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
> >> >>
> >> >>
> >> >>
> >> >>
> >
> >
> >
> >--
> >Best,
> >Yanfei
>

Re:Re: Re: flink on yarn 异常停电问题咨询

Posted by guanyq <dl...@163.com>.
没有开启增量chk
文件损坏是看了启动日志,启动日志尝试从10个chk启动,但是都因为以下块损坏启动失败了
错误日志为:

java.io.IOException: Got error, status message opReadBlock BP-1003103929-192.168.200.11-1668473836936:blk_1301252639_227512278 received exception
org.apache.hadoop.hdfs.server.datanode.CorruptMetaHeaderException:
The meta file length 0 is less than the expected length 7, for OP_READ_BLOCK, self=/ip:45534, remote=/ip:9866,
for file /tmp/flink/ha/application_1678102326043_0007/completedCheckpoint58755403e33a, for pool BP-1003103929-192.168.200.11-1668473836936 block 1301252639_227512278
at org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.checkBlockOpStatus(DataTransferProtoUtil.java:142)
at org.apache.hadoop.hdfs.RemoteBlockReader2.checkSuccess(RemoteBlockReader2.java:456)
at org.apache.hadoop.hdfs.RemoteBlockReader2.newBlockReader(RemoteBlockReader2.java:424)
















在 2023-03-10 10:26:11,"Yanfei Lei" <fr...@gmail.com> 写道:
>Hi 可以尝试去flink配置的checkpoint dir下面去找一找历史chk-x文件夹,如果能找到历史的chk-x,可以尝试手工指定 chk重启[1]。
>
>> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
>请问作业开启增量checkpoint了吗?在开启了增量的情况下,如果是比较早的一个checkpoint的文件损坏了,会影响后续基于它进行增量的checkpoint。
>
>> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
>是观察到checkpoint dir下面没有文件吗?
>
>[1] https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints
>
>guanyq <dl...@163.com> 于2023年3月10日周五 08:58写道:
>>
>> 目前也想着用savepoint处理异常停电的问题
>> 但是我这面还有个疑问:
>> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
>> 就很奇怪是不是10个checkpoint都没落盘导致的。
>> 想问下:
>> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2023-03-10 08:47:11,"Shammon FY" <zj...@gmail.com> 写道:
>> >Hi
>> >
>> >我觉得Flink
>> >作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
>> >
>> >Best,
>> >Shammon
>> >
>> >On Thu, Mar 9, 2023 at 10:06 PM guanyq <dl...@163.com> wrote:
>> >
>> >> 前提
>> >> 1.flink配置了高可用
>> >> 2.flink配置checkpoint数为10
>> >> 3.yarn集群配置了任务恢复
>> >> 疑问
>> >> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
>> >>
>> >>
>> >>
>> >>
>
>
>
>-- 
>Best,
>Yanfei

Re: Re: flink on yarn 异常停电问题咨询

Posted by Yanfei Lei <fr...@gmail.com>.
Hi 可以尝试去flink配置的checkpoint dir下面去找一找历史chk-x文件夹,如果能找到历史的chk-x,可以尝试手工指定 chk重启[1]。

> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
请问作业开启增量checkpoint了吗?在开启了增量的情况下,如果是比较早的一个checkpoint的文件损坏了,会影响后续基于它进行增量的checkpoint。

> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
是观察到checkpoint dir下面没有文件吗?

[1] https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/ops/state/savepoints/#resuming-from-savepoints

guanyq <dl...@163.com> 于2023年3月10日周五 08:58写道:
>
> 目前也想着用savepoint处理异常停电的问题
> 但是我这面还有个疑问:
> flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
> 就很奇怪是不是10个checkpoint都没落盘导致的。
> 想问下:
> checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-03-10 08:47:11,"Shammon FY" <zj...@gmail.com> 写道:
> >Hi
> >
> >我觉得Flink
> >作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
> >
> >Best,
> >Shammon
> >
> >On Thu, Mar 9, 2023 at 10:06 PM guanyq <dl...@163.com> wrote:
> >
> >> 前提
> >> 1.flink配置了高可用
> >> 2.flink配置checkpoint数为10
> >> 3.yarn集群配置了任务恢复
> >> 疑问
> >> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
> >>
> >>
> >>
> >>



-- 
Best,
Yanfei

Re:Re: flink on yarn 异常停电问题咨询

Posted by guanyq <dl...@163.com>.
目前也想着用savepoint处理异常停电的问题
但是我这面还有个疑问:
flink任务是10个checkpoint,每个checkpoint间隔5秒,如果突然停电,为什么所有的checkpoint都损坏了。
就很奇怪是不是10个checkpoint都没落盘导致的。
想问下:
checkpoint落盘的机制,这个应该和hdfs写入有关系,flink任务checkpoint成功,但是hdfs却没有落盘。

















在 2023-03-10 08:47:11,"Shammon FY" <zj...@gmail.com> 写道:
>Hi
>
>我觉得Flink
>作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业
>
>Best,
>Shammon
>
>On Thu, Mar 9, 2023 at 10:06 PM guanyq <dl...@163.com> wrote:
>
>> 前提
>> 1.flink配置了高可用
>> 2.flink配置checkpoint数为10
>> 3.yarn集群配置了任务恢复
>> 疑问
>> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
>>
>>
>>
>>

Re: flink on yarn 异常停电问题咨询

Posted by Shammon FY <zj...@gmail.com>.
Hi

我觉得Flink
作业恢复失败时,作业本身很难确定失败是checkpoint的文件块损坏之类的原因。如果你的作业做过savepoint,可以尝试从指定的savepoint恢复作业

Best,
Shammon

On Thu, Mar 9, 2023 at 10:06 PM guanyq <dl...@163.com> wrote:

> 前提
> 1.flink配置了高可用
> 2.flink配置checkpoint数为10
> 3.yarn集群配置了任务恢复
> 疑问
> yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动
>
>
>
>

flink on yarn 异常停电问题咨询

Posted by guanyq <dl...@163.com>.
前提
1.flink配置了高可用
2.flink配置checkpoint数为10
3.yarn集群配置了任务恢复
疑问
yarn集群停电重启后,恢复flink任务时,如果最近的checkpoint由于停电导致块损坏,是否会尝试从其他checkpoint启动




Re: flink 1.14

Posted by yidan zhao <hi...@gmail.com>.
airflow~

RS <ti...@163.com> 于2022年6月27日周一 09:11写道:
>
> Hi,
> 这边是通过DolphinScheduler来调度的,里面也可以配置job之间的依赖
> 其他调度系统应该也有类似的功能
>
>
> Thanks~
>
>
>
>
>
> 在 2022-04-29 16:03:15,"guanyq" <dl...@163.com> 写道:
> >咨询下各位大佬
> >flink sql在做批处理时,生产环境一般都用什么来做定时调度?
> >如果存在job之间的依赖,生产环境是又是采用什么来做通知的?
> >
> >
> >我这面主要是想把hive sql 修改为 flink sql

Re:flink 1.14

Posted by RS <ti...@163.com>.
Hi,
这边是通过DolphinScheduler来调度的,里面也可以配置job之间的依赖
其他调度系统应该也有类似的功能


Thanks~





在 2022-04-29 16:03:15,"guanyq" <dl...@163.com> 写道:
>咨询下各位大佬
>flink sql在做批处理时,生产环境一般都用什么来做定时调度?
>如果存在job之间的依赖,生产环境是又是采用什么来做通知的?
>
>
>我这面主要是想把hive sql 修改为 flink sql

flink 1.14

Posted by guanyq <dl...@163.com>.
咨询下各位大佬
flink sql在做批处理时,生产环境一般都用什么来做定时调度?
如果存在job之间的依赖,生产环境是又是采用什么来做通知的?


我这面主要是想把hive sql 修改为 flink sql

Re: 如何条件查询不扫描全表

Posted by Shengkai Fang <fs...@gmail.com>.
Hi,

可以为 HBaseDynamicTableSource 实现 FilterPushDown[1] 接口。这样子 你直接添加的 filter 会被下推到
Source,从而在读取的时候过滤掉不相关的数据。

[1]
https://github.com/apache/flink/blob/a09cc4704433cb76b936a51b422d812e1ae57945/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsFilterPushDown.java#L63

Best,
Shengkai

汪赟 <wa...@icloud.com.invalid> 于2022年4月14日周四 09:30写道:

> flink table
> 查询hbase表时,如何添加过滤器,类似prefixFiler,而不是直接使用算子filter,因为会扫描全表在进行过滤,十分不友好
> flink streaming 读取时,是可以添加过滤器,但是返回数据是流式的,转化为表也是陆续增加,而我本身是想拿到所需数据然后进行统计。
> 请问下如何做到table在查询原表时能用上条件,而不是全量扫描进行过滤?
>
>
>

如何条件查询不扫描全表

Posted by 汪赟 <wa...@icloud.com.INVALID>.
flink table 查询hbase表时,如何添加过滤器,类似prefixFiler,而不是直接使用算子filter,因为会扫描全表在进行过滤,十分不友好
flink streaming 读取时,是可以添加过滤器,但是返回数据是流式的,转化为表也是陆续增加,而我本身是想拿到所需数据然后进行统计。
请问下如何做到table在查询原表时能用上条件,而不是全量扫描进行过滤?



Re: flink 1.15

Posted by Zhanghao Chen <zh...@outlook.com>.
不是的哈。MVP 是 Minimum Viable Product (最简可行产品)的缩写,代表一个只实现了核心功能,听取早期用户反馈来后续进一步完善的版本。

Best,
Zhanghao Chen
________________________________
From: guanyq <dl...@163.com>
Sent: Saturday, April 2, 2022 14:56
To: user-zh@flink.apache.org <us...@flink.apache.org>
Subject: flink 1.15

看了FFA的分享(流批一体) Flink1.15版本推出 MVP版本,动态表存储的流批一体


请问MVP版本是收费版么?

flink 1.15

Posted by guanyq <dl...@163.com>.
看了FFA的分享(流批一体) Flink1.15版本推出 MVP版本,动态表存储的流批一体


请问MVP版本是收费版么?

flink executeAsync()

Posted by guanyq <dl...@163.com>.
请问下 flink executeAsync() 一般都什么使用场景
无限流情况下executeAsync() / execute()是一样的吧
主要是想问下 场景/区别

flink1.14.0 temporal join hive

Posted by guanyq <dl...@163.com>.
哪位大佬分享下temporal join hive的demo,
参考下
我这面本地一直有问题。



 





 





 





 

Flink1.12.0版本 FlinkStreaming如何将偏移量提交到kafka

Posted by guanyq <dl...@163.com>.
kafka版本0.11
目前查看消费组的解压情况,报消费组不存在。

Flink1.12.0版本 Distinct Aggregation

Posted by guanyq <dl...@163.com>.
附件是代码,按照官网写的demo。
不知道哪里有问题,麻烦帮忙看下。


root

 |-- orderId: STRING

 |-- userId: INT

 |-- money: INT

 |-- createTime: BIGINT

 |-- pt: TIMESTAMP(3) *PROCTIME*




17:17:11,935 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class org.apache.flink.types.Row is missing a default constructor so it cannot be used as a POJO type and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.

Exception in thread "main" java.lang.RuntimeException: Unknown call expression: count(orderId)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:102)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)

at org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:126)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter$1.toRexNode(ExpressionConverter.java:226)

at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)

at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)

at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)

at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)

at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)

at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)

at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)

at org.apache.flink.table.planner.expressions.converter.OverConvertRule.convert(OverConvertRule.java:81)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)

at org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:126)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter$1.toRexNode(ExpressionConverter.java:226)

at org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.convertAs(CustomizedConvertRule.java:251)

at org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.lambda$convert$0(CustomizedConvertRule.java:93)

at java.util.Optional.map(Optional.java:215)

at org.apache.flink.table.planner.expressions.converter.CustomizedConvertRule.convert(CustomizedConvertRule.java:93)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97)

at org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)

at org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:126)

at org.apache.flink.table.planner.plan.QueryOperationConverter.convertExprToRexNode(QueryOperationConverter.java:734)

at org.apache.flink.table.planner.plan.QueryOperationConverter.access$800(QueryOperationConverter.java:129)

at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.lambda$convertToRexNodes$6(QueryOperationConverter.java:540)

at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)

at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)

at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)

at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)

at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)

at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)

at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)

at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.convertToRexNodes(QueryOperationConverter.java:541)

at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:153)

at org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.visit(QueryOperationConverter.java:149)

at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)

at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:146)

at org.apache.flink.table.planner.plan.QueryOperationConverter.defaultMethod(QueryOperationConverter.java:128)

at org.apache.flink.table.operations.utils.QueryOperationDefaultVisitor.visit(QueryOperationDefaultVisitor.java:47)

at org.apache.flink.table.operations.ProjectQueryOperation.accept(ProjectQueryOperation.java:75)

at org.apache.flink.table.planner.calcite.FlinkRelBuilder.queryOperation(FlinkRelBuilder.scala:186)

at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:250)

at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164)

at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:164)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

at scala.collection.Iterator$class.foreach(Iterator.scala:891)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)

at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)

at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)

at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:331)

at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:307)

at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.java:298)

at com.guanyq.study.TableAPIAndSQL.TableAPI.Aggregations.DistinctAggregation3.main(DistinctAggregation3.java:68)