You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Sharon Xie <sh...@gmail.com> on 2022/03/18 05:28:50 UTC

Kafka source with multiple partitions loses data during savepoint recovery

Hi,

I'm seeing an odd behavior for Kafka source where some records are dropped
during recovery.

My test set up is: Kafka source topic -> pass through flink job -> Kafka
sink topic
There are 10 partitions in the source & sink topics.

Test Steps
* Start the flink job, send 5 records (first batch) to the source topic,
and read the sink. I see all 5 records without issue.
* Stop the job with a savepoint
* Send another 10 records (second batch) to the source topic
* Start the job with the savepoint

Expect: read from the beginning of the sink topic, I should see all 15
records from the first and second batches.
Actual: Some random records in the second batches are missing.

My guess is that the savepoint only contains offsets with partitions that
received records from the first batch. Other partitions didn't have a state
and by default read from the `latest` offset during recovery. So records
from the second batch that fell into the previously empty partitions are
never processed.

However, based on the source code
<https://github.com/apache/flink/blob/release-1.14/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L579-L582>,
I'd expect the partitions without records from the 1st batch to be
initialized with `earliest-offset`. But this is not the behavior I saw.
What do I miss?

I'm using Flink 1.14.3. May I know  if there is anything I missed? If not,
what's the reason for such behavior? Otherwise, is this a bug?



Thanks,
Sharon

Re: Kafka source with multiple partitions loses data during savepoint recovery

Posted by Sharon Xie <sh...@gmail.com>.
Thanks Qingsheng,

I'm using Flink SQL. I'll need to dig a bit deeper to see which Flink
consumer class is actually used.

I didn't find the log you linked. However I found the following log from
the task manager. Just a note, in this test, I sent 10 records to the
source topic which has 10 partitions:

{"@timestamp":"2022-03-18T03:54:24.577Z","@version":"1","message":"Adding
split(s) to reader: [[Partition: stream-00000000-67362410-8,
StartingOffset: -1, StoppingOffset: -9223372036854775808], [Partition:
stream-00000000-67362410-9, StartingOffset: 1, StoppingOffset:
-9223372036854775808], [Partition: stream-00000000-67362410-6,
StartingOffset: 3, StoppingOffset: -9223372036854775808], [Partition:
stream-00000000-67362410-7, StartingOffset: 3, StoppingOffset:
-9223372036854775808], [Partition: stream-00000000-67362410-4,
StartingOffset: 2, StoppingOffset: -9223372036854775808], [Partition:
stream-00000000-67362410-5, StartingOffset: 2, StoppingOffset:
-9223372036854775808], [Partition: stream-00000000-67362410-2,
StartingOffset: 3, StoppingOffset: -9223372036854775808], [Partition:
stream-00000000-67362410-3, StartingOffset: 2, StoppingOffset:
-9223372036854775808], [Partition: stream-00000000-67362410-0,
StartingOffset: 1, StoppingOffset: -9223372036854775808], [Partition:
stream-00000000-67362410-1, StartingOffset: 3, StoppingOffset:
-9223372036854775808]]","logger_name":"org.apache.flink.connector.base.source.reader.SourceReaderBase","thread_name":"Source:
KafkaSource-default_catalog.default_database.decodable-e2e-test--counter_in--fb5d7159
(1/1)#0","level":"INFO","level_value":20000}

{"@timestamp":"2022-03-18T03:54:25.073Z","@version":"1","message":"[Consumer
clientId=KafkaSource--5609054382257236677-0, groupId=null] Seeking to
LATEST offset of partition
stream-00000000-67362410-8","logger_name":"org.apache.kafka.clients.consumer.internals.SubscriptionState","thread_name":"Source
Data Fetcher for Source:
KafkaSource-default_catalog.default_database.decodable-e2e-test--counter_in--fb5d7159
(1/1)#0","level":"INFO","level_value":20000}
{"@timestamp":"2022-03-18T03:54:25.075Z","@version":"1","message":"[Consumer
clientId=KafkaSource--5609054382257236677-0, groupId=null] Seeking to
offset 3 for partition
stream-00000000-67362410-7","logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"Source
Data Fetcher for Source:
KafkaSource-default_catalog.default_database.decodable-e2e-test--counter_in--fb5d7159
(1/1)#0","level":"INFO","level_value":20000}
{"@timestamp":"2022-03-18T03:54:25.078Z","@version":"1","message":"[Consumer
clientId=KafkaSource--5609054382257236677-0, groupId=null] Seeking to
offset 1 for partition
stream-00000000-67362410-9","logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"Source
Data Fetcher for Source:
KafkaSource-default_catalog.default_database.decodable-e2e-test--counter_in--fb5d7159
(1/1)#0","level":"INFO","level_value":20000}
{"@timestamp":"2022-03-18T03:54:25.078Z","@version":"1","message":"[Consumer
clientId=KafkaSource--5609054382257236677-0, groupId=null] Seeking to
offset 1 for partition
stream-00000000-67362410-0","logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"Source
Data Fetcher for Source:
KafkaSource-default_catalog.default_database.decodable-e2e-test--counter_in--fb5d7159
(1/1)#0","level":"INFO","level_value":20000}
{"@timestamp":"2022-03-18T03:54:25.078Z","@version":"1","message":"[Consumer
clientId=KafkaSource--5609054382257236677-0, groupId=null] Seeking to
offset 3 for partition
stream-00000000-67362410-2","logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"Source
Data Fetcher for Source:
KafkaSource-default_catalog.default_database.decodable-e2e-test--counter_in--fb5d7159
(1/1)#0","level":"INFO","level_value":20000}
{"@timestamp":"2022-03-18T03:54:25.078Z","@version":"1","message":"[Consumer
clientId=KafkaSource--5609054382257236677-0, groupId=null] Seeking to
offset 3 for partition
stream-00000000-67362410-1","logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"Source
Data Fetcher for Source:
KafkaSource-default_catalog.default_database.decodable-e2e-test--counter_in--fb5d7159
(1/1)#0","level":"INFO","level_value":20000}
{"@timestamp":"2022-03-18T03:54:25.079Z","@version":"1","message":"[Consumer
clientId=KafkaSource--5609054382257236677-0, groupId=null] Seeking to
offset 2 for partition
stream-00000000-67362410-4","logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"Source
Data Fetcher for Source:
KafkaSource-default_catalog.default_database.decodable-e2e-test--counter_in--fb5d7159
(1/1)#0","level":"INFO","level_value":20000}
{"@timestamp":"2022-03-18T03:54:25.080Z","@version":"1","message":"[Consumer
clientId=KafkaSource--5609054382257236677-0, groupId=null] Seeking to
offset 2 for partition
stream-00000000-67362410-3","logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"Source
Data Fetcher for Source:
KafkaSource-default_catalog.default_database.decodable-e2e-test--counter_in--fb5d7159
(1/1)#0","level":"INFO","level_value":20000}
{"@timestamp":"2022-03-18T03:54:25.082Z","@version":"1","message":"[Consumer
clientId=KafkaSource--5609054382257236677-0, groupId=null] Seeking to
offset 3 for partition
stream-00000000-67362410-6","logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"Source
Data Fetcher for Source:
KafkaSource-default_catalog.default_database.decodable-e2e-test--counter_in--fb5d7159
(1/1)#0","level":"INFO","level_value":20000}
{"@timestamp":"2022-03-18T03:54:25.083Z","@version":"1","message":"[Consumer
clientId=KafkaSource--5609054382257236677-0, groupId=null] Seeking to
offset 2 for partition
stream-00000000-67362410-5","logger_name":"org.apache.kafka.clients.consumer.KafkaConsumer","thread_name":"Source
Data Fetcher for Source:
KafkaSource-default_catalog.default_database.decodable-e2e-test--counter_in--fb5d7159
(1/1)#0","level":"INFO","level_value":20000}


As you can see, the `stream-00000000-67362410-8` partition had
StartingOffset -1 and  started from the LATEST offset during restore which
confirmed what I describled.



On Fri, Mar 18, 2022 at 2:13 AM Qingsheng Ren <re...@gmail.com> wrote:

> Hi Sharon,
>
> Could you check the log after starting the job with savepoint? If you have
> INFO log enabled you will get an entry “Consumer subtask {} will start
> reading {} partitions with offsets in restored state: {}” [1] in the log,
> which shows the starting offset of partitions. This might be helpful to
> reveal the problem.
>
> BTW FlinkKafkaConsumer has been marked as deprecated since 1.14. Please
> consider switching to the new KafkaSource if you are developing new
> applications.
>
> [1]
> https://github.com/apache/flink/blob/a2df2665b6ff411a2aeb9b204fd9d46a2af0ecfa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L614-L618
>
> Best regards,
>
> Qingsheng
>
> > On Mar 18, 2022, at 13:28, Sharon Xie <sh...@gmail.com> wrote:
> >
> > Hi,
> >
> > I'm seeing an odd behavior for Kafka source where some records are
> dropped during recovery.
> >
> > My test set up is: Kafka source topic -> pass through flink job -> Kafka
> sink topic
> > There are 10 partitions in the source & sink topics.
> >
> > Test Steps
> > * Start the flink job, send 5 records (first batch) to the source topic,
> and read the sink. I see all 5 records without issue.
> > * Stop the job with a savepoint
> > * Send another 10 records (second batch) to the source topic
> > * Start the job with the savepoint
> >
> > Expect: read from the beginning of the sink topic, I should see all 15
> records from the first and second batches.
> > Actual: Some random records in the second batches are missing.
> >
> > My guess is that the savepoint only contains offsets with partitions
> that received records from the first batch. Other partitions didn't have a
> state and by default read from the `latest` offset during recovery. So
> records from the second batch that fell into the previously empty
> partitions are never processed.
> >
> > However, based on the source code, I'd expect the partitions without
> records from the 1st batch to be initialized with `earliest-offset`. But
> this is not the behavior I saw. What do I miss?
> >
> > I'm using Flink 1.14.3. May I know  if there is anything I missed? If
> not, what's the reason for such behavior? Otherwise, is this a bug?
> >
> >
> >
> > Thanks,
> > Sharon
>
>

Re: Kafka source with multiple partitions loses data during savepoint recovery

Posted by Qingsheng Ren <re...@gmail.com>.
Hi Sharon, 

Could you check the log after starting the job with savepoint? If you have INFO log enabled you will get an entry “Consumer subtask {} will start reading {} partitions with offsets in restored state: {}” [1] in the log, which shows the starting offset of partitions. This might be helpful to reveal the problem.

BTW FlinkKafkaConsumer has been marked as deprecated since 1.14. Please consider switching to the new KafkaSource if you are developing new applications. 

[1] https://github.com/apache/flink/blob/a2df2665b6ff411a2aeb9b204fd9d46a2af0ecfa/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L614-L618

Best regards, 

Qingsheng

> On Mar 18, 2022, at 13:28, Sharon Xie <sh...@gmail.com> wrote:
> 
> Hi, 
> 
> I'm seeing an odd behavior for Kafka source where some records are dropped during recovery. 
> 
> My test set up is: Kafka source topic -> pass through flink job -> Kafka sink topic
> There are 10 partitions in the source & sink topics.
> 
> Test Steps
> * Start the flink job, send 5 records (first batch) to the source topic, and read the sink. I see all 5 records without issue.
> * Stop the job with a savepoint
> * Send another 10 records (second batch) to the source topic
> * Start the job with the savepoint
> 
> Expect: read from the beginning of the sink topic, I should see all 15 records from the first and second batches.
> Actual: Some random records in the second batches are missing.
> 
> My guess is that the savepoint only contains offsets with partitions that received records from the first batch. Other partitions didn't have a state and by default read from the `latest` offset during recovery. So records from the second batch that fell into the previously empty partitions are never processed. 
> 
> However, based on the source code, I'd expect the partitions without records from the 1st batch to be initialized with `earliest-offset`. But this is not the behavior I saw. What do I miss?
> 
> I'm using Flink 1.14.3. May I know  if there is anything I missed? If not, what's the reason for such behavior? Otherwise, is this a bug?
> 
> 
> 
> Thanks,
> Sharon