You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Wei-Che Wei (JIRA)" <ji...@apache.org> on 2017/05/24 12:33:04 UTC
[jira] [Comment Edited] (FLINK-6653) Avoid directly serializing
AWS's Shard class in Kinesis consumer's checkpoints
[ https://issues.apache.org/jira/browse/FLINK-6653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16022800#comment-16022800 ]
Wei-Che Wei edited comment on FLINK-6653 at 5/24/17 12:32 PM:
--------------------------------------------------------------
Hi [~tzulitai],
I would like to take over this issue and here is my proposal.
Description:
# Consumer will read two possible states: {{KinesisStreamShardV2}} or {KinesisStreamShard}} and merge to {{KinesisStreamShardV2}}
# Convert {{KinesisStreamShardV2}} to {{KinesisStreamShardHandle}} for the internal class to interact with AWS library.
# Convert {{KinesisStreamShardHandle}} to {{KinesisStreamShardV2}} and write the new states.
Proposed Changes:
# Introduces two models :
## {{KinesisStreamShardV2}}: It stores the stream name and all the information in {{Shard}} to decouple with AWS library, and will be the new state.
## {{KinesisStreamShardHandle}}: It is same as {{KinesisStreamShard}} but will not be able to be serialized. It is used in {{KinesisFetcher}}
and {{ShardConsumer}} to distinguish from the legacy {{KinesisStreamShard}} state so that it can be changed along with any update in AWS library.
# Add {{KinesisStreamShardHandle}} in {{KinesisStreamShardState}}.
# Make {{KinesisStreamShardV2}} and {{SequenceNumber}} be POJO type.
# Two util functions to convert between {{KinesisStreamShardV2}} and {{KinesisStreamShardHandle}}
# An util function to convert {{KinesisStreamShard}} to {{KinesisStreamShardV2}}
Test Plan:
# Update all tests with the whole new models and modified models.
# Migrate test that makes sure states will be restored if there is only legacy state in state backend.
# Unit test for those util functions.
# Test the new state will be serialized by POJO serializer.
was (Author: tonywei):
Hi [~tzulitai],
I would like to take over this issue and here is my proposal.
Proposed Changes:
# Introduces two models :
## {{KinesisStreamShardV2}}: It stores the stream name and all the information in {{Shard}} to decouple with AWS library, and will be the new state.
## {{KinesisStreamShardHandle}}: It is same as {{KinesisStreamShard}} but will not be able to be serialized. It is used in {{KinesisFetcher}}
and {{ShardConsumer}} to distinguish from the legacy {{KinesisStreamShard}} state so that it can be changed along with any update in AWS library.
# Add {{KinesisStreamShardHandle}} in {{KinesisStreamShardState}}.
# Make {{KinesisStreamShardV2}} and {{SequenceNumber}} be POJO type.
# Two util functions to convert between {{KinesisStreamShardV2}} and {{KinesisStreamShardHandle}}
# An util function to convert {{KinesisStreamShard}} to {{KinesisStreamShardV2}}
Test Plan:
# Update all tests with the whole new models and modified models.
# Migrate test that makes sure states will be restored if there is only legacy state in state backend.
# Unit test for those util functions.
# Test the new state will be serialized by POJO serializer.
> Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints
> ------------------------------------------------------------------------------
>
> Key: FLINK-6653
> URL: https://issues.apache.org/jira/browse/FLINK-6653
> Project: Flink
> Issue Type: Improvement
> Components: Kinesis Connector
> Reporter: Tzu-Li (Gordon) Tai
>
> Currently, the Kinesis consumer's checkpoints directly serialize AWS's {{Shard}} instances in checkpoints. This makes bumping AWS library versions hard, since any change to the {{Shard}} class by AWS will break checkpoint compatibility.
> We should either have custom serialization for {{KinesisStreamShard}}, or disintegrate the information in {{Shard}}. Ideally, it would be best to make {{KinesisStreamShard}} and {{SequenceNumber}} to be non-serializable, hence avoiding Java serialization in the checkpoints.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)