You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "chenchuangchuang (Jira)" <ji...@apache.org> on 2020/05/12 14:32:00 UTC
[jira] [Created] (FLINK-17638) FlinkKafkaConsumerBase restore from
empty state will be set consum from earliest forced
chenchuangchuang created FLINK-17638:
----------------------------------------
Summary: FlinkKafkaConsumerBase restore from empty state will be set consum from earliest forced
Key: FLINK-17638
URL: https://issues.apache.org/jira/browse/FLINK-17638
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: 1.10.0, 1.9.3, 1.9.0
Environment: Flink 1.9.0
kafka 1.1.0
jdk 1.8
Reporter: chenchuangchuang
my work target and data is like this :
# i need count the number of post per user create last 30 days in my system
# the total and realtime data is in MYSQL
# i can get increment MYSQL binlog from kafka-1.1.1 ( it just store the last 7 days binlog), the topic name is "binlog_post_topic"
# so , i have to combine the MYSQL data and the binlog data
i do it in this way:
# first , i carry a snapshot of MYSQL data to kafka topic in order of create_time ( topic name is "init-post-topic"), and consume from kafka topic "init-post-topic" as flink data-stream with the SlidingEventTimeWindows
# second, after the task do all the data in the topic "init-post-topic" , i create a save point for the task , call the save point save-point-a
# third, i modify my code ,
## the data source is "binlog_post_topic" topic of kafka ,
## other operotor will not change,
## and the "binlog_post_topic" is setted consuming from special timestamp (when the snapshot of MYSQL create )
# forth, i restart my task from save-point-a
but i find the kafka consumer for the "binlog_post_topic" do not consume data from the timestamp i setted, but from the earlist, i find the log in the task manager
{code:java}
//代码占位符
2020-05-11 17:20:47,228 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 restored state: {}.
...
2020-05-12 20:14:52,641 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 will start reading 1 partitions with offsets in restored state: {KafkaTopicPartition{topic='binlog-kk_social-post', partition=0}=-915623761775}
2020-05-11 17:20:47,414 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 creating fetcher with offsets {KafkaTopicPartition{topic='binlog-kk_social-post', partition=0}=-915623761775}.
{code}
i guess this may be caused by the FlinkKafkaConsumerBase
then i find code like this
in the method FlinkKafkaConsumerBase.initializeState()
{code:java}
//代码占位符
if (context.isRestored() && !restoredFromOldState) {
restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
....{code}
this code mean that if a task is restart from the save point ,that restoredState will not be null, at least be an empty TreeMap;
and in FlinkKafkaConsumerBase.open()
{code:java}
//代码占位符
if (restoredState != null) {
for (KafkaTopicPartition partition : allPartitions) {
if (!restoredState.containsKey(partition)) {
restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
}
}
{code}
in this place will init the consumer , if a task is restart from a save-point , restoredState at least is an empty TreeMap, then in this code , the consumer will be setted consume from
KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET
i change this code like this
{code:java}
//代码占位符
if (restoredState != null && !restoredState.isEmpty()) {
....
{code}
and this work well for me .
--
This message was sent by Atlassian Jira
(v8.3.4#803005)