You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Zachary Manno <za...@capitalone.com> on 2021/04/28 14:28:10 UTC

Flink Resuming From Checkpoint With "-s" FAILURE

Hello,
I am running Flink version 1.10 on Amazon EMR. We use RocksDB/S3 for state.
I have "Persist Checkpoints Externally" enabled. Periodically I must tear
down the current infrastructure and bring it back up. To do this, I
terminate the EMR, bring up a fresh EMR cluster, and then I resume the
Flink job from the latest checkpoint path in S3 using the "-s" method here:

https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/savepoints.html#resuming-from-savepoints

I last did this operation on April 19. Then, on April 27 I deployed a new
version of the code only, using savepointing. This caused a production
incident because it turned out that on April 19th one of the Kafka
partition offsets was not committed currently somehow during the resuming
from checkpoint. When the new code was deployed on the 27th a backfill of
Kafka messages came in from the 19th to the 27th which caused the issue.

I am attaching screenshots of Datadog metrics for the Kafka consumer
metrics Flink provides. One graph is:
".KafkaConsumer.topic.{topic_name}.partition.{partition_number}.committedOffsets"

And the next is:
"KafkaConsumer.topic.{topic_name}.partition.{partition_number}.currentOffsets"


The light blue line is partition 3 and that is the one that caused the
issue. Does anyone have any insight into what could have happened? And what
I can do to prevent this in the future? Unfortunately since the EMR was
terminated I cannot provide the full logs. I am able to search for keywords
or get sections since we have external Splunk logging but cannot get full
files.

Thanks for any help!!

[image: Committed_Offsets.png]
[image: Current_Offsets.png]

______________________________________________________________________



The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.




Re: [External Sender] Re: Flink Resuming From Checkpoint With "-s" FAILURE

Posted by Zachary Manno <za...@capitalone.com>.
Hi Becket, do you have any thoughts on this issue? Please let me know if
you'd like any more details.

Thank you!

- Zach

On Fri, Apr 30, 2021 at 3:14 AM Till Rohrmann <tr...@apache.org> wrote:

> From the top of my head I cannot think of anything concrete you want to
> look for in the logs. I am pulling in Becket who has a bit more experience
> with Kafka and Flink's connector. Maybe he has an idea what could cause the
> problem.
>
> One idea you could try out is whether this problem also occurs with a
> newer Flink version. Not sure whether this is possible for you.
>
> Cheers,
> Till
>
> On Thu, Apr 29, 2021 at 11:29 PM Zachary Manno <
> zachary.manno@capitalone.com> wrote:
>
>> Also forgot to say, the yellow line steeply dropped off at 12 (was closer
>> to 11) because we deployed the job again at that time to try to stop the
>> bleeding. At 11/12 we took another savepoint, cancelled the job, and
>> brought it back up from that savepoint. It is all good from that point as
>> well.
>>
>> Thanks,
>> Zach
>>
>> On Thu, Apr 29, 2021 at 3:07 PM Zachary Manno <
>> zachary.manno@capitalone.com> wrote:
>>
>>> The graph monitors this metric in the flink dashboard:
>>>
>>> [image: Screen Shot 2021-04-29 at 2.58.51 PM.png]
>>> So it is just the count of Kafka messages consumed, per instance. The
>>> sum of all 5 instances equals the "Records sent" metric on the Flink UI for
>>> that Kafka consumer. When a new job is deployed everything starts off as 0
>>> again so that is why the sharp drops, then it increases as more messages
>>> are consumed. If no messages are being consumed it will flatline and hold
>>> its last value. What I've never seen before is that sharp rise in messages
>>> consumed, for only one instance, right when the job was deployed. I know
>>> we're working in the dark here without logs but do you have any keywords I
>>> could search? Or can think of any similar issues resuming a job manually
>>> from a checkpoint? Or anything configuration-wise we should look at?
>>>
>>> Going forward before we tear down instances we are going to manually
>>> take a savepoint first. There is probably a lot of randomness in just
>>> terminating the EMR while the job is running and relying on the last
>>> checkpoint to resume. However, I am still uneasy on if this could happen
>>> with savepointing as well.
>>>
>>> Thank you for your responses I really appreciate it!
>>>
>>> On Thu, Apr 29, 2021 at 12:57 PM Till Rohrmann <tr...@apache.org>
>>> wrote:
>>>
>>>> Not sure whether I can interpret the graph properly without seeing the
>>>> logs of Flink. If the graph shows the "numRecordsOut", then it looks as if
>>>> the yellow KafkaConsumer gets restarted a bit later. I assume that VIP
>>>> transactions is a strictly monotonously increasing value, right? Then it is
>>>> odd that you have a short spike upwards, then downwards and then it keeps
>>>> on the same level until it drops to 0. It would be good to understand what
>>>> is happening there.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Apr 29, 2021 at 4:42 PM Zachary Manno <
>>>> zachary.manno@capitalone.com> wrote:
>>>>
>>>>> Hi Till thanks for the response.
>>>>>
>>>>> We have checkpointing enabled and no other offset committing configs
>>>>> so after reading this
>>>>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
>>>>> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html*kafka-consumers-offset-committing-behaviour-configuration__;Iw!!FrPt2g6CO4Wadw!bWy04haQsTeww1r6AaybqfvtT16lYqPuClpmEpaZxkr-9WkKjArDMEsrSg3iR9pMiYtynQ$>
>>>>> we should be committing the offsets on checkpoint, however in the
>>>>> consumer config logs I see
>>>>> "*enable.auto.commit = true*". So step 1 should be turning that off
>>>>> then?
>>>>>
>>>>> For your summary, it is all correct except it wasn't every message
>>>>> from the 19th to the 27th, it was only a portion of messages. Also from the
>>>>> 19th to the 27th the application was behaving as expected. I attached a
>>>>> graph of messages consumed from the Kafka source operator "numRecordsOut".
>>>>> For this topic there are 6 partitions, for our job we have parallelism of
>>>>> 5, and 5 EC2 instances. All instances look fine except the yellow one which
>>>>> shoots up unexpectedly, and this is where the extra messages came from.
>>>>>
>>>>> [image: Screen Shot 2021-04-29 at 10.25.09 AM.png]
>>>>>
>>>>> Attaching another graph from March 31 - April 14 which shows a few
>>>>> different deployments, no weird spike like the April 27 graph.
>>>>>
>>>>> [image: Screen Shot 2021-04-29 at 10.40.30 AM.png]
>>>>>
>>>>>
>>>>> On Thu, Apr 29, 2021 at 3:51 AM Till Rohrmann <tr...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Zachary,
>>>>>>
>>>>>> How did you configure the Kafka connector to commit the offsets
>>>>>> (periodically, on checkpoint)? One explanation for the graphs you showed is
>>>>>> that you enabled periodic committing of the offsets. If this
>>>>>> automatic commit happens between two checkpoints and you later fall back to
>>>>>> the earlier checkpoint, it should be possible that you see with the next
>>>>>> periodic committing of the offsets that it dropped. Note, that Flink does
>>>>>> not rely on the offset committed to the Kafka broker for fault tolerance.
>>>>>> It stores the actual offset in its internal state.
>>>>>>
>>>>>> In order to better understand the scenario let me try to summarize
>>>>>> it. Periodically you restart your infrastructure and then resume the Flink
>>>>>> job from the latest checkpoint. You did this on the 19th of April. Then on
>>>>>> the 27th of April you created a savepoint from the job you restarted on the
>>>>>> 19th but was running fine since then. And then you submitted a new job
>>>>>> resuming from this savepoint. And all of a sudden, this new job started to
>>>>>> consume data from Kafka starting from the 19th of April. Is this correct?
>>>>>> If this happened like described, then the Flink job seems to not have made
>>>>>> a lot of progress since you restarted it. Without the logs it is really
>>>>>> hard to tell what could be the cause.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Wed, Apr 28, 2021 at 4:36 PM Zachary Manno <
>>>>>> zachary.manno@capitalone.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>> I am running Flink version 1.10 on Amazon EMR. We use RocksDB/S3 for
>>>>>>> state. I have "Persist Checkpoints Externally" enabled. Periodically I must
>>>>>>> tear down the current infrastructure and bring it back up. To do this, I
>>>>>>> terminate the EMR, bring up a fresh EMR cluster, and then I resume the
>>>>>>> Flink job from the latest checkpoint path in S3 using the "-s" method here:
>>>>>>>
>>>>>>>
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/savepoints.html#resuming-from-savepoints
>>>>>>> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/savepoints.html*resuming-from-savepoints__;Iw!!FrPt2g6CO4Wadw!ZJutEJPNXXXmCsU8Y9x0MWKGYrDqOllRHnUVRVpjttvhNeWWyQ3_P87e2p97K8mIJXku4Q$>
>>>>>>>
>>>>>>> I last did this operation on April 19. Then, on April 27 I deployed
>>>>>>> a new version of the code only, using savepointing. This caused a
>>>>>>> production incident because it turned out that on April 19th one of the
>>>>>>> Kafka partition offsets was not committed currently somehow during the
>>>>>>> resuming from checkpoint. When the new code was deployed on the 27th a
>>>>>>> backfill of Kafka messages came in from the 19th to the 27th which caused
>>>>>>> the issue.
>>>>>>>
>>>>>>> I am attaching screenshots of Datadog metrics for the Kafka consumer
>>>>>>> metrics Flink provides. One graph is:
>>>>>>>
>>>>>>> ".KafkaConsumer.topic.{topic_name}.partition.{partition_number}.committedOffsets"
>>>>>>>
>>>>>>> And the next is:
>>>>>>>
>>>>>>> "KafkaConsumer.topic.{topic_name}.partition.{partition_number}.currentOffsets"
>>>>>>>
>>>>>>>
>>>>>>> The light blue line is partition 3 and that is the one that caused
>>>>>>> the issue. Does anyone have any insight into what could have happened? And
>>>>>>> what I can do to prevent this in the future? Unfortunately since the EMR
>>>>>>> was terminated I cannot provide the full logs. I am able to search for
>>>>>>> keywords or get sections since we have external Splunk logging but cannot
>>>>>>> get full files.
>>>>>>>
>>>>>>> Thanks for any help!!
>>>>>>>
>>>>>>> [image: Committed_Offsets.png]
>>>>>>> [image: Current_Offsets.png]
>>>>>>>
>>>>>>> ------------------------------
>>>>>>>
>>>>>>> The information contained in this e-mail is confidential and/or
>>>>>>> proprietary to Capital One and/or its affiliates and may only be used
>>>>>>> solely in performance of work or services for Capital One. The information
>>>>>>> transmitted herewith is intended only for use by the individual or entity
>>>>>>> to which it is addressed. If the reader of this message is not the intended
>>>>>>> recipient, you are hereby notified that any review, retransmission,
>>>>>>> dissemination, distribution, copying or other use of, or taking of any
>>>>>>> action in reliance upon this information is strictly prohibited. If you
>>>>>>> have received this communication in error, please contact the sender and
>>>>>>> delete the material from your computer.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> *Zach Manno*
>>>>>
>>>>> *Data Engineer*
>>>>>
>>>>> *Small Business Bank*
>>>>> ------------------------------
>>>>>
>>>>> The information contained in this e-mail is confidential and/or
>>>>> proprietary to Capital One and/or its affiliates and may only be used
>>>>> solely in performance of work or services for Capital One. The information
>>>>> transmitted herewith is intended only for use by the individual or entity
>>>>> to which it is addressed. If the reader of this message is not the intended
>>>>> recipient, you are hereby notified that any review, retransmission,
>>>>> dissemination, distribution, copying or other use of, or taking of any
>>>>> action in reliance upon this information is strictly prohibited. If you
>>>>> have received this communication in error, please contact the sender and
>>>>> delete the material from your computer.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>
>>> --
>>>
>>> *Zach Manno*
>>>
>>> *Data Engineer*
>>>
>>> *Small Business Bank*
>>>
>>
>>
>> --
>>
>> *Zach Manno*
>>
>> *Data Engineer*
>>
>> *Small Business Bank*
>> ------------------------------
>>
>> The information contained in this e-mail is confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>
>>
>>
>>
>>

-- 

*Zach Manno*

*Data Engineer*

*Small Business Bank*

______________________________________________________________________



The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.




Re: [External Sender] Re: Flink Resuming From Checkpoint With "-s" FAILURE

Posted by Till Rohrmann <tr...@apache.org>.
From the top of my head I cannot think of anything concrete you want to
look for in the logs. I am pulling in Becket who has a bit more experience
with Kafka and Flink's connector. Maybe he has an idea what could cause the
problem.

One idea you could try out is whether this problem also occurs with a newer
Flink version. Not sure whether this is possible for you.

Cheers,
Till

On Thu, Apr 29, 2021 at 11:29 PM Zachary Manno <za...@capitalone.com>
wrote:

> Also forgot to say, the yellow line steeply dropped off at 12 (was closer
> to 11) because we deployed the job again at that time to try to stop the
> bleeding. At 11/12 we took another savepoint, cancelled the job, and
> brought it back up from that savepoint. It is all good from that point as
> well.
>
> Thanks,
> Zach
>
> On Thu, Apr 29, 2021 at 3:07 PM Zachary Manno <
> zachary.manno@capitalone.com> wrote:
>
>> The graph monitors this metric in the flink dashboard:
>>
>> [image: Screen Shot 2021-04-29 at 2.58.51 PM.png]
>> So it is just the count of Kafka messages consumed, per instance. The sum
>> of all 5 instances equals the "Records sent" metric on the Flink UI for
>> that Kafka consumer. When a new job is deployed everything starts off as 0
>> again so that is why the sharp drops, then it increases as more messages
>> are consumed. If no messages are being consumed it will flatline and hold
>> its last value. What I've never seen before is that sharp rise in messages
>> consumed, for only one instance, right when the job was deployed. I know
>> we're working in the dark here without logs but do you have any keywords I
>> could search? Or can think of any similar issues resuming a job manually
>> from a checkpoint? Or anything configuration-wise we should look at?
>>
>> Going forward before we tear down instances we are going to manually take
>> a savepoint first. There is probably a lot of randomness in just
>> terminating the EMR while the job is running and relying on the last
>> checkpoint to resume. However, I am still uneasy on if this could happen
>> with savepointing as well.
>>
>> Thank you for your responses I really appreciate it!
>>
>> On Thu, Apr 29, 2021 at 12:57 PM Till Rohrmann <tr...@apache.org>
>> wrote:
>>
>>> Not sure whether I can interpret the graph properly without seeing the
>>> logs of Flink. If the graph shows the "numRecordsOut", then it looks as if
>>> the yellow KafkaConsumer gets restarted a bit later. I assume that VIP
>>> transactions is a strictly monotonously increasing value, right? Then it is
>>> odd that you have a short spike upwards, then downwards and then it keeps
>>> on the same level until it drops to 0. It would be good to understand what
>>> is happening there.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Apr 29, 2021 at 4:42 PM Zachary Manno <
>>> zachary.manno@capitalone.com> wrote:
>>>
>>>> Hi Till thanks for the response.
>>>>
>>>> We have checkpointing enabled and no other offset committing configs so
>>>> after reading this
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
>>>> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html*kafka-consumers-offset-committing-behaviour-configuration__;Iw!!FrPt2g6CO4Wadw!bWy04haQsTeww1r6AaybqfvtT16lYqPuClpmEpaZxkr-9WkKjArDMEsrSg3iR9pMiYtynQ$>
>>>> we should be committing the offsets on checkpoint, however in the
>>>> consumer config logs I see
>>>> "*enable.auto.commit = true*". So step 1 should be turning that off
>>>> then?
>>>>
>>>> For your summary, it is all correct except it wasn't every message from
>>>> the 19th to the 27th, it was only a portion of messages. Also from the 19th
>>>> to the 27th the application was behaving as expected. I attached a graph of
>>>> messages consumed from the Kafka source operator "numRecordsOut". For this
>>>> topic there are 6 partitions, for our job we have parallelism of 5, and 5
>>>> EC2 instances. All instances look fine except the yellow one which shoots
>>>> up unexpectedly, and this is where the extra messages came from.
>>>>
>>>> [image: Screen Shot 2021-04-29 at 10.25.09 AM.png]
>>>>
>>>> Attaching another graph from March 31 - April 14 which shows a few
>>>> different deployments, no weird spike like the April 27 graph.
>>>>
>>>> [image: Screen Shot 2021-04-29 at 10.40.30 AM.png]
>>>>
>>>>
>>>> On Thu, Apr 29, 2021 at 3:51 AM Till Rohrmann <tr...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Zachary,
>>>>>
>>>>> How did you configure the Kafka connector to commit the offsets
>>>>> (periodically, on checkpoint)? One explanation for the graphs you showed is
>>>>> that you enabled periodic committing of the offsets. If this
>>>>> automatic commit happens between two checkpoints and you later fall back to
>>>>> the earlier checkpoint, it should be possible that you see with the next
>>>>> periodic committing of the offsets that it dropped. Note, that Flink does
>>>>> not rely on the offset committed to the Kafka broker for fault tolerance.
>>>>> It stores the actual offset in its internal state.
>>>>>
>>>>> In order to better understand the scenario let me try to summarize it.
>>>>> Periodically you restart your infrastructure and then resume the Flink job
>>>>> from the latest checkpoint. You did this on the 19th of April. Then on the
>>>>> 27th of April you created a savepoint from the job you restarted on the
>>>>> 19th but was running fine since then. And then you submitted a new job
>>>>> resuming from this savepoint. And all of a sudden, this new job started to
>>>>> consume data from Kafka starting from the 19th of April. Is this correct?
>>>>> If this happened like described, then the Flink job seems to not have made
>>>>> a lot of progress since you restarted it. Without the logs it is really
>>>>> hard to tell what could be the cause.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Wed, Apr 28, 2021 at 4:36 PM Zachary Manno <
>>>>> zachary.manno@capitalone.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>> I am running Flink version 1.10 on Amazon EMR. We use RocksDB/S3 for
>>>>>> state. I have "Persist Checkpoints Externally" enabled. Periodically I must
>>>>>> tear down the current infrastructure and bring it back up. To do this, I
>>>>>> terminate the EMR, bring up a fresh EMR cluster, and then I resume the
>>>>>> Flink job from the latest checkpoint path in S3 using the "-s" method here:
>>>>>>
>>>>>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/savepoints.html#resuming-from-savepoints
>>>>>> <https://urldefense.com/v3/__https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/savepoints.html*resuming-from-savepoints__;Iw!!FrPt2g6CO4Wadw!ZJutEJPNXXXmCsU8Y9x0MWKGYrDqOllRHnUVRVpjttvhNeWWyQ3_P87e2p97K8mIJXku4Q$>
>>>>>>
>>>>>> I last did this operation on April 19. Then, on April 27 I deployed a
>>>>>> new version of the code only, using savepointing. This caused a production
>>>>>> incident because it turned out that on April 19th one of the Kafka
>>>>>> partition offsets was not committed currently somehow during the resuming
>>>>>> from checkpoint. When the new code was deployed on the 27th a backfill of
>>>>>> Kafka messages came in from the 19th to the 27th which caused the issue.
>>>>>>
>>>>>> I am attaching screenshots of Datadog metrics for the Kafka consumer
>>>>>> metrics Flink provides. One graph is:
>>>>>>
>>>>>> ".KafkaConsumer.topic.{topic_name}.partition.{partition_number}.committedOffsets"
>>>>>>
>>>>>> And the next is:
>>>>>>
>>>>>> "KafkaConsumer.topic.{topic_name}.partition.{partition_number}.currentOffsets"
>>>>>>
>>>>>>
>>>>>> The light blue line is partition 3 and that is the one that caused
>>>>>> the issue. Does anyone have any insight into what could have happened? And
>>>>>> what I can do to prevent this in the future? Unfortunately since the EMR
>>>>>> was terminated I cannot provide the full logs. I am able to search for
>>>>>> keywords or get sections since we have external Splunk logging but cannot
>>>>>> get full files.
>>>>>>
>>>>>> Thanks for any help!!
>>>>>>
>>>>>> [image: Committed_Offsets.png]
>>>>>> [image: Current_Offsets.png]
>>>>>>
>>>>>> ------------------------------
>>>>>>
>>>>>> The information contained in this e-mail is confidential and/or
>>>>>> proprietary to Capital One and/or its affiliates and may only be used
>>>>>> solely in performance of work or services for Capital One. The information
>>>>>> transmitted herewith is intended only for use by the individual or entity
>>>>>> to which it is addressed. If the reader of this message is not the intended
>>>>>> recipient, you are hereby notified that any review, retransmission,
>>>>>> dissemination, distribution, copying or other use of, or taking of any
>>>>>> action in reliance upon this information is strictly prohibited. If you
>>>>>> have received this communication in error, please contact the sender and
>>>>>> delete the material from your computer.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>
>>>> --
>>>>
>>>> *Zach Manno*
>>>>
>>>> *Data Engineer*
>>>>
>>>> *Small Business Bank*
>>>> ------------------------------
>>>>
>>>> The information contained in this e-mail is confidential and/or
>>>> proprietary to Capital One and/or its affiliates and may only be used
>>>> solely in performance of work or services for Capital One. The information
>>>> transmitted herewith is intended only for use by the individual or entity
>>>> to which it is addressed. If the reader of this message is not the intended
>>>> recipient, you are hereby notified that any review, retransmission,
>>>> dissemination, distribution, copying or other use of, or taking of any
>>>> action in reliance upon this information is strictly prohibited. If you
>>>> have received this communication in error, please contact the sender and
>>>> delete the material from your computer.
>>>>
>>>>
>>>>
>>>>
>>>>
>>
>> --
>>
>> *Zach Manno*
>>
>> *Data Engineer*
>>
>> *Small Business Bank*
>>
>
>
> --
>
> *Zach Manno*
>
> *Data Engineer*
>
> *Small Business Bank*
> ------------------------------
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
>
>

Re: Flink Resuming From Checkpoint With "-s" FAILURE

Posted by Till Rohrmann <tr...@apache.org>.
Hi Zachary,

How did you configure the Kafka connector to commit the offsets
(periodically, on checkpoint)? One explanation for the graphs you showed is
that you enabled periodic committing of the offsets. If this
automatic commit happens between two checkpoints and you later fall back to
the earlier checkpoint, it should be possible that you see with the next
periodic committing of the offsets that it dropped. Note, that Flink does
not rely on the offset committed to the Kafka broker for fault tolerance.
It stores the actual offset in its internal state.

In order to better understand the scenario let me try to summarize it.
Periodically you restart your infrastructure and then resume the Flink job
from the latest checkpoint. You did this on the 19th of April. Then on the
27th of April you created a savepoint from the job you restarted on the
19th but was running fine since then. And then you submitted a new job
resuming from this savepoint. And all of a sudden, this new job started to
consume data from Kafka starting from the 19th of April. Is this correct?
If this happened like described, then the Flink job seems to not have made
a lot of progress since you restarted it. Without the logs it is really
hard to tell what could be the cause.

Cheers,
Till

On Wed, Apr 28, 2021 at 4:36 PM Zachary Manno <za...@capitalone.com>
wrote:

> Hello,
> I am running Flink version 1.10 on Amazon EMR. We use RocksDB/S3 for
> state. I have "Persist Checkpoints Externally" enabled. Periodically I must
> tear down the current infrastructure and bring it back up. To do this, I
> terminate the EMR, bring up a fresh EMR cluster, and then I resume the
> Flink job from the latest checkpoint path in S3 using the "-s" method here:
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/savepoints.html#resuming-from-savepoints
>
> I last did this operation on April 19. Then, on April 27 I deployed a new
> version of the code only, using savepointing. This caused a production
> incident because it turned out that on April 19th one of the Kafka
> partition offsets was not committed currently somehow during the resuming
> from checkpoint. When the new code was deployed on the 27th a backfill of
> Kafka messages came in from the 19th to the 27th which caused the issue.
>
> I am attaching screenshots of Datadog metrics for the Kafka consumer
> metrics Flink provides. One graph is:
>
> ".KafkaConsumer.topic.{topic_name}.partition.{partition_number}.committedOffsets"
>
> And the next is:
>
> "KafkaConsumer.topic.{topic_name}.partition.{partition_number}.currentOffsets"
>
>
> The light blue line is partition 3 and that is the one that caused the
> issue. Does anyone have any insight into what could have happened? And what
> I can do to prevent this in the future? Unfortunately since the EMR was
> terminated I cannot provide the full logs. I am able to search for keywords
> or get sections since we have external Splunk logging but cannot get full
> files.
>
> Thanks for any help!!
>
> [image: Committed_Offsets.png]
> [image: Current_Offsets.png]
>
> ------------------------------
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
>
>