You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "chenchuangchuang (Jira)" <ji...@apache.org> on 2020/05/12 14:32:00 UTC

[jira] [Created] (FLINK-17638) FlinkKafkaConsumerBase restore from empty state will be set consum from earliest forced

chenchuangchuang created FLINK-17638:
----------------------------------------

             Summary: FlinkKafkaConsumerBase restore from empty state will be set consum from earliest forced
                 Key: FLINK-17638
                 URL: https://issues.apache.org/jira/browse/FLINK-17638
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka
    Affects Versions: 1.10.0, 1.9.3, 1.9.0
         Environment: Flink 1.9.0

kafka 1.1.0

jdk 1.8
            Reporter: chenchuangchuang


my work target and data  is like this    :
 # i need count the number of post per user create last 30 days in my system
 # the total and realtime data is in MYSQL
 # i can get increment MYSQL binlog  from  kafka-1.1.1 ( it just  store the last 7 days binlog), the topic name is "binlog_post_topic"
 # so , i have to combine the MYSQL data and the binlog data

 

i do it in this way:
 # first , i carry a snapshot of MYSQL data to kafka  topic in order of create_time ( topic name is "init-post-topic"), and consume from kafka topic    "init-post-topic" as flink data-stream with the SlidingEventTimeWindows
 # second, after the task do all the data in the topic "init-post-topic" , i create a save point for the task , call the save point  save-point-a
 # third, i modify my code ,
 ## the data source is "binlog_post_topic"  topic of kafka ,
 ## other operotor will not change,
 ## and the "binlog_post_topic"  is setted consuming  from  special timestamp (when the snapshot of MYSQL create )
 # forth, i restart my task from save-point-a

but i find the kafka consumer for the "binlog_post_topic" do not consume data from the timestamp i setted, but from the earlist,  i find the log in the task manager 
{code:java}
//代码占位符
2020-05-11 17:20:47,228 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 restored state: {}.

...
2020-05-12 20:14:52,641 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 will start reading 1 partitions with offsets in restored state: {KafkaTopicPartition{topic='binlog-kk_social-post', partition=0}=-915623761775}
2020-05-11 17:20:47,414 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  - Consumer subtask 0 creating fetcher with offsets {KafkaTopicPartition{topic='binlog-kk_social-post', partition=0}=-915623761775}.


{code}

i guess this may be caused by the FlinkKafkaConsumerBase
then i find code like this 

in the method FlinkKafkaConsumerBase.initializeState()
{code:java}
//代码占位符
if (context.isRestored() && !restoredFromOldState) {
   restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
....{code}
this code mean that  if a task is restart from the save point ,that restoredState will not be null, at least be an empty TreeMap;

and in FlinkKafkaConsumerBase.open()
{code:java}
//代码占位符
if (restoredState != null) {
   for (KafkaTopicPartition partition : allPartitions) {
      if (!restoredState.containsKey(partition)) {
         restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
      }
   }
{code}
in this place will init the consumer , if a task is restart from a save-point , restoredState at least  is an empty TreeMap, then in this code , the consumer will be setted consume from 

KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET

i change this code like this 
{code:java}
//代码占位符
if (restoredState != null && !restoredState.isEmpty()) {
....

{code}
 

and this work well for me .

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)