You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Philip Luppens <ph...@gmail.com> on 2018/01/22 16:08:04 UTC

Flink Kinesis Consumer re-reading merged shards upon restart

Hi everyone,

For the past weeks, we’ve been struggling with Kinesis ingestion using the
Flink Kinesis connector, but the seemingly complete lack of similar reports
makes us wonder if perhaps we misconfigured or mis-used the connector.

We’re using the connector to subscribe to streams varying from 1 to a 100
shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis
stream up and down during peak times. What we’ve noticed is that, while we
were having closed shards, any Flink job restart with check- or save-point
would result in shards being re-read from the event horizon, duplicating
our events.

We started checking the checkpoint state, and found that the shards were
stored correctly with the proper sequence number (including for closed
shards), but that upon restarts, the older closed shards would be read from
the event horizon, as if their restored state would be ignored.

In the end, we believe that we found the problem: in the
FlinkKinesisConsumer’s run() method, we’re trying to find the shard
returned from the KinesisDataFetcher against the shards’ metadata from the
restoration point, but we do this via a containsKey() call, which means
we’ll use the StreamShardMetadata’s equals() method. However, this checks
for all properties, including the endingSequenceNumber, which might have
changed between the restored state’s checkpoint and our data fetch, thus
failing the equality check, failing the containsKey() check, and resulting
in the shard being re-read from the event horizon, even though it was
present in the restored state.

We’ve created a workaround where we only check for the shardId and stream
name to restore the state of the shards we’ve already seen, and this seems
to work correctly. However, as pointed out above, the lack of similar
reports makes us worried that we’ve misunderstood something, so we’d
appreciate any feedback whether or not our report makes sense before we
file a bug in the issue tracker.

Much appreciated,

-Phil

-- 
"We cannot change the cards we are dealt, just how we play the hand." -
Randy Pausch

Re: Flink Kinesis Consumer re-reading merged shards upon restart

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Phil,

Thanks a lot for the PR! Let’s continue the discussion there.
I think the ICLA signing is only strictly required for becoming a Committer of the project, so we’re good to go for the pull request :)

Cheers,
Gordon

On 23 January 2018 at 9:13:45 AM, Philip Luppens (philip.luppens@gmail.com) wrote:

Hi Gordon,

I’ve created a PR [1] with my proposed code changes. Let me know if anything is missing.

I think I signed a CLA many years ago, so that should be ok as well.

[1] https://github.com/apache/flink/pull/5337

HTH,

-Phil

On Mon, Jan 22, 2018 at 7:17 PM, Philip Luppens <ph...@gmail.com> wrote:
Hi Gordon,

Yeah, I’d need to confirm with our devops guys that this is the case (by default, the Kinesis monitoring doesn’t show how many/which shards were re-ingested, all I remember is seeing the iterator age shooting up again to the retention horizon, but no clue if this was because of 1 shard, or more). I do remember we were having issues regardless when there were closed shards, but I could be wrong.

[1] https://issues.apache.org/jira/browse/FLINK-8484

I’ve created a ticket [1] to track the issue, and I’ll see if I can provide a small patch against the 1.3 branch.

HTH,

-Phil

On Mon, Jan 22, 2018 at 6:26 PM, Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:
Hi Philip,

Thanks a lot for reporting this, and looking into this in detail.

Your observation sounds accurate to me. The `endingSequenceNumber` would no longer be null once a shard is closed, so on restore that would mistaken the consumer to think that it’s a new shard and start consuming it from the earliest sequence number possible (i.e., treating it as if it is a new shard that was created while the job wasn’t running).

I think we haven’t seen other reports on this, yet, because the issue you observed seems to only happen in a corner case where you rescaled the Kinesis stream while the job was down.
Could you confirm that assumption? My guess is probably Flink users who uses Kinesis have currently only been rescaling Kinesis streams while the job was running.

Your workaround is also a valid fix for this bug. Could you file a JIRA for this? Would be happy to also review a PR for the fix, if you would like to contribute it.

Cheers,
Gordon


On 22 January 2018 at 5:08:36 PM, Philip Luppens (philip.luppens@gmail.com) wrote:

Hi everyone,

For the past weeks, we’ve been struggling with Kinesis ingestion using the Flink Kinesis connector, but the seemingly complete lack of similar reports makes us wonder if perhaps we misconfigured or mis-used the connector.

We’re using the connector to subscribe to streams varying from 1 to a 100 shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis stream up and down during peak times. What we’ve noticed is that, while we were having closed shards, any Flink job restart with check- or save-point would result in shards being re-read from the event horizon, duplicating our events.

We started checking the checkpoint state, and found that the shards were stored correctly with the proper sequence number (including for closed shards), but that upon restarts, the older closed shards would be read from the event horizon, as if their restored state would be ignored.

In the end, we believe that we found the problem: in the FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned from the KinesisDataFetcher against the shards’ metadata from the restoration point, but we do this via a containsKey() call, which means we’ll use the StreamShardMetadata’s equals() method. However, this checks for all properties, including the endingSequenceNumber, which might have changed between the restored state’s checkpoint and our data fetch, thus failing the equality check, failing the containsKey() check, and resulting in the shard being re-read from the event horizon, even though it was present in the restored state.

We’ve created a workaround where we only check for the shardId and stream name to restore the state of the shards we’ve already seen, and this seems to work correctly. However, as pointed out above, the lack of similar reports makes us worried that we’ve misunderstood something, so we’d appreciate any feedback whether or not our report makes sense before we file a bug in the issue tracker.

Much appreciated,

-Phil

--
"We cannot change the cards we are dealt, just how we play the hand." - Randy Pausch



--
"We cannot change the cards we are dealt, just how we play the hand." - Randy Pausch



--
"We cannot change the cards we are dealt, just how we play the hand." - Randy Pausch

Re: Flink Kinesis Consumer re-reading merged shards upon restart

Posted by Philip Luppens <ph...@gmail.com>.
Hi Gordon,

I’ve created a PR [1] with my proposed code changes. Let me know if
anything is missing.

I think I signed a CLA many years ago, so that should be ok as well.

[1] https://github.com/apache/flink/pull/5337

HTH,

-Phil

On Mon, Jan 22, 2018 at 7:17 PM, Philip Luppens <ph...@gmail.com>
wrote:

> Hi Gordon,
>
> Yeah, I’d need to confirm with our devops guys that this is the case (by
> default, the Kinesis monitoring doesn’t show how many/which shards were
> re-ingested, all I remember is seeing the iterator age shooting up again to
> the retention horizon, but no clue if this was because of 1 shard, or
> more). I do remember we were having issues regardless when there were
> closed shards, but I could be wrong.
>
> [1] https://issues.apache.org/jira/browse/FLINK-8484
>
> I’ve created a ticket [1] to track the issue, and I’ll see if I can
> provide a small patch against the 1.3 branch.
>
> HTH,
>
> -Phil
>
> On Mon, Jan 22, 2018 at 6:26 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
> wrote:
>
>> Hi Philip,
>>
>> Thanks a lot for reporting this, and looking into this in detail.
>>
>> Your observation sounds accurate to me. The `endingSequenceNumber` would
>> no longer be null once a shard is closed, so on restore that would mistaken
>> the consumer to think that it’s a new shard and start consuming it from the
>> earliest sequence number possible (i.e., treating it as if it is a new
>> shard that was created while the job wasn’t running).
>>
>> I think we haven’t seen other reports on this, yet, because the issue you
>> observed seems to only happen in a corner case where you rescaled the
>> Kinesis stream while the job was down.
>> Could you confirm that assumption? My guess is probably Flink users who
>> uses Kinesis have currently only been rescaling Kinesis streams while the
>> job was running.
>>
>> Your workaround is also a valid fix for this bug. Could you file a JIRA
>> for this? Would be happy to also review a PR for the fix, if you would like
>> to contribute it.
>>
>> Cheers,
>> Gordon
>>
>>
>> On 22 January 2018 at 5:08:36 PM, Philip Luppens (
>> philip.luppens@gmail.com) wrote:
>>
>> Hi everyone,
>>
>> For the past weeks, we’ve been struggling with Kinesis ingestion using
>> the Flink Kinesis connector, but the seemingly complete lack of similar
>> reports makes us wonder if perhaps we misconfigured or mis-used the
>> connector.
>>
>> We’re using the connector to subscribe to streams varying from 1 to a 100
>> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis
>> stream up and down during peak times. What we’ve noticed is that, while we
>> were having closed shards, any Flink job restart with check- or save-point
>> would result in shards being re-read from the event horizon, duplicating
>> our events.
>>
>> We started checking the checkpoint state, and found that the shards were
>> stored correctly with the proper sequence number (including for closed
>> shards), but that upon restarts, the older closed shards would be read from
>> the event horizon, as if their restored state would be ignored.
>>
>> In the end, we believe that we found the problem: in the
>> FlinkKinesisConsumer’s run() method, we’re trying to find the shard
>> returned from the KinesisDataFetcher against the shards’ metadata from the
>> restoration point, but we do this via a containsKey() call, which means
>> we’ll use the StreamShardMetadata’s equals() method. However, this checks
>> for all properties, including the endingSequenceNumber, which might have
>> changed between the restored state’s checkpoint and our data fetch, thus
>> failing the equality check, failing the containsKey() check, and resulting
>> in the shard being re-read from the event horizon, even though it was
>> present in the restored state.
>>
>> We’ve created a workaround where we only check for the shardId and stream
>> name to restore the state of the shards we’ve already seen, and this seems
>> to work correctly. However, as pointed out above, the lack of similar
>> reports makes us worried that we’ve misunderstood something, so we’d
>> appreciate any feedback whether or not our report makes sense before we
>> file a bug in the issue tracker.
>>
>> Much appreciated,
>>
>> -Phil
>>
>> --
>> "We cannot change the cards we are dealt, just how we play the hand." -
>> Randy Pausch
>>
>>
>
>
> --
> "We cannot change the cards we are dealt, just how we play the hand." -
> Randy Pausch
>



-- 
"We cannot change the cards we are dealt, just how we play the hand." -
Randy Pausch

Re: Flink Kinesis Consumer re-reading merged shards upon restart

Posted by Philip Luppens <ph...@gmail.com>.
Hi Gordon,

Yeah, I’d need to confirm with our devops guys that this is the case (by
default, the Kinesis monitoring doesn’t show how many/which shards were
re-ingested, all I remember is seeing the iterator age shooting up again to
the retention horizon, but no clue if this was because of 1 shard, or
more). I do remember we were having issues regardless when there were
closed shards, but I could be wrong.

[1] https://issues.apache.org/jira/browse/FLINK-8484

I’ve created a ticket [1] to track the issue, and I’ll see if I can provide
a small patch against the 1.3 branch.

HTH,

-Phil

On Mon, Jan 22, 2018 at 6:26 PM, Tzu-Li (Gordon) Tai <tz...@apache.org>
wrote:

> Hi Philip,
>
> Thanks a lot for reporting this, and looking into this in detail.
>
> Your observation sounds accurate to me. The `endingSequenceNumber` would
> no longer be null once a shard is closed, so on restore that would mistaken
> the consumer to think that it’s a new shard and start consuming it from the
> earliest sequence number possible (i.e., treating it as if it is a new
> shard that was created while the job wasn’t running).
>
> I think we haven’t seen other reports on this, yet, because the issue you
> observed seems to only happen in a corner case where you rescaled the
> Kinesis stream while the job was down.
> Could you confirm that assumption? My guess is probably Flink users who
> uses Kinesis have currently only been rescaling Kinesis streams while the
> job was running.
>
> Your workaround is also a valid fix for this bug. Could you file a JIRA
> for this? Would be happy to also review a PR for the fix, if you would like
> to contribute it.
>
> Cheers,
> Gordon
>
>
> On 22 January 2018 at 5:08:36 PM, Philip Luppens (philip.luppens@gmail.com)
> wrote:
>
> Hi everyone,
>
> For the past weeks, we’ve been struggling with Kinesis ingestion using the
> Flink Kinesis connector, but the seemingly complete lack of similar reports
> makes us wonder if perhaps we misconfigured or mis-used the connector.
>
> We’re using the connector to subscribe to streams varying from 1 to a 100
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis
> stream up and down during peak times. What we’ve noticed is that, while we
> were having closed shards, any Flink job restart with check- or save-point
> would result in shards being re-read from the event horizon, duplicating
> our events.
>
> We started checking the checkpoint state, and found that the shards were
> stored correctly with the proper sequence number (including for closed
> shards), but that upon restarts, the older closed shards would be read from
> the event horizon, as if their restored state would be ignored.
>
> In the end, we believe that we found the problem: in the
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard
> returned from the KinesisDataFetcher against the shards’ metadata from the
> restoration point, but we do this via a containsKey() call, which means
> we’ll use the StreamShardMetadata’s equals() method. However, this checks
> for all properties, including the endingSequenceNumber, which might have
> changed between the restored state’s checkpoint and our data fetch, thus
> failing the equality check, failing the containsKey() check, and resulting
> in the shard being re-read from the event horizon, even though it was
> present in the restored state.
>
> We’ve created a workaround where we only check for the shardId and stream
> name to restore the state of the shards we’ve already seen, and this seems
> to work correctly. However, as pointed out above, the lack of similar
> reports makes us worried that we’ve misunderstood something, so we’d
> appreciate any feedback whether or not our report makes sense before we
> file a bug in the issue tracker.
>
> Much appreciated,
>
> -Phil
>
> --
> "We cannot change the cards we are dealt, just how we play the hand." -
> Randy Pausch
>
>


-- 
"We cannot change the cards we are dealt, just how we play the hand." -
Randy Pausch

Re: Flink Kinesis Consumer re-reading merged shards upon restart

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi Philip,

Thanks a lot for reporting this, and looking into this in detail.

Your observation sounds accurate to me. The `endingSequenceNumber` would no longer be null once a shard is closed, so on restore that would mistaken the consumer to think that it’s a new shard and start consuming it from the earliest sequence number possible (i.e., treating it as if it is a new shard that was created while the job wasn’t running).

I think we haven’t seen other reports on this, yet, because the issue you observed seems to only happen in a corner case where you rescaled the Kinesis stream while the job was down.
Could you confirm that assumption? My guess is probably Flink users who uses Kinesis have currently only been rescaling Kinesis streams while the job was running.

Your workaround is also a valid fix for this bug. Could you file a JIRA for this? Would be happy to also review a PR for the fix, if you would like to contribute it.

Cheers,
Gordon

On 22 January 2018 at 5:08:36 PM, Philip Luppens (philip.luppens@gmail.com) wrote:

Hi everyone,

For the past weeks, we’ve been struggling with Kinesis ingestion using the Flink Kinesis connector, but the seemingly complete lack of similar reports makes us wonder if perhaps we misconfigured or mis-used the connector.

We’re using the connector to subscribe to streams varying from 1 to a 100 shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis stream up and down during peak times. What we’ve noticed is that, while we were having closed shards, any Flink job restart with check- or save-point would result in shards being re-read from the event horizon, duplicating our events.

We started checking the checkpoint state, and found that the shards were stored correctly with the proper sequence number (including for closed shards), but that upon restarts, the older closed shards would be read from the event horizon, as if their restored state would be ignored.

In the end, we believe that we found the problem: in the FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned from the KinesisDataFetcher against the shards’ metadata from the restoration point, but we do this via a containsKey() call, which means we’ll use the StreamShardMetadata’s equals() method. However, this checks for all properties, including the endingSequenceNumber, which might have changed between the restored state’s checkpoint and our data fetch, thus failing the equality check, failing the containsKey() check, and resulting in the shard being re-read from the event horizon, even though it was present in the restored state.

We’ve created a workaround where we only check for the shardId and stream name to restore the state of the shards we’ve already seen, and this seems to work correctly. However, as pointed out above, the lack of similar reports makes us worried that we’ve misunderstood something, so we’d appreciate any feedback whether or not our report makes sense before we file a bug in the issue tracker.

Much appreciated,

-Phil

--
"We cannot change the cards we are dealt, just how we play the hand." - Randy Pausch