You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Wei-Che Wei (JIRA)" <ji...@apache.org> on 2017/05/24 12:33:04 UTC

[jira] [Comment Edited] (FLINK-6653) Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints

    [ https://issues.apache.org/jira/browse/FLINK-6653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16022800#comment-16022800 ] 

Wei-Che Wei edited comment on FLINK-6653 at 5/24/17 12:32 PM:
--------------------------------------------------------------

Hi [~tzulitai],

I would like to take over this issue and here is my proposal.

Description:
# Consumer will read two possible states: {{KinesisStreamShardV2}} or {KinesisStreamShard}} and merge to {{KinesisStreamShardV2}}
# Convert {{KinesisStreamShardV2}} to {{KinesisStreamShardHandle}} for the internal class to interact with AWS library.
# Convert {{KinesisStreamShardHandle}} to {{KinesisStreamShardV2}} and write the new states.

Proposed Changes:
# Introduces two models :
## {{KinesisStreamShardV2}}: It stores the stream name and all the information in {{Shard}} to decouple with AWS library, and will be the new state.
## {{KinesisStreamShardHandle}}: It is same as {{KinesisStreamShard}} but will not be able to be serialized. It is used in {{KinesisFetcher}}
 and {{ShardConsumer}} to distinguish from the legacy {{KinesisStreamShard}} state so that it can be changed along with any update in AWS library.
# Add {{KinesisStreamShardHandle}} in {{KinesisStreamShardState}}.
# Make {{KinesisStreamShardV2}} and {{SequenceNumber}} be POJO type.
# Two util functions to convert between {{KinesisStreamShardV2}} and {{KinesisStreamShardHandle}}
# An util function to convert {{KinesisStreamShard}} to {{KinesisStreamShardV2}}

Test Plan:
# Update all tests with the whole new models and modified models.
# Migrate test that makes sure states will be restored if there is only legacy state in state backend.
# Unit test for those util functions.
# Test the new state will be serialized by POJO serializer.


was (Author: tonywei):
Hi [~tzulitai],

I would like to take over this issue and here is my proposal.

Proposed Changes:
# Introduces two models :
## {{KinesisStreamShardV2}}: It stores the stream name and all the information in {{Shard}} to decouple with AWS library, and will be the new state.
## {{KinesisStreamShardHandle}}: It is same as {{KinesisStreamShard}} but will not be able to be serialized. It is used in {{KinesisFetcher}}
 and {{ShardConsumer}} to distinguish from the legacy {{KinesisStreamShard}} state so that it can be changed along with any update in AWS library.
# Add {{KinesisStreamShardHandle}} in {{KinesisStreamShardState}}.
# Make {{KinesisStreamShardV2}} and {{SequenceNumber}} be POJO type.
# Two util functions to convert between {{KinesisStreamShardV2}} and {{KinesisStreamShardHandle}}
# An util function to convert {{KinesisStreamShard}} to {{KinesisStreamShardV2}}

Test Plan:
# Update all tests with the whole new models and modified models.
# Migrate test that makes sure states will be restored if there is only legacy state in state backend.
# Unit test for those util functions.
# Test the new state will be serialized by POJO serializer.

> Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-6653
>                 URL: https://issues.apache.org/jira/browse/FLINK-6653
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kinesis Connector
>            Reporter: Tzu-Li (Gordon) Tai
>
> Currently, the Kinesis consumer's checkpoints directly serialize AWS's {{Shard}} instances in checkpoints. This makes bumping AWS library versions hard, since any change to the {{Shard}} class by AWS will break checkpoint compatibility.
> We should either have custom serialization for {{KinesisStreamShard}}, or disintegrate the information in {{Shard}}. Ideally, it would be best to make {{KinesisStreamShard}} and {{SequenceNumber}} to be non-serializable, hence avoiding Java serialization in the checkpoints.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)