You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Pierre-Yves Bigourdan (Jira)" <ji...@apache.org> on 2021/09/15 09:56:00 UTC

[jira] [Commented] (CAMEL-16594) DynamoDB stream updates are missed when there are more than one active shards

    [ https://issues.apache.org/jira/browse/CAMEL-16594?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17415420#comment-17415420 ] 

Pierre-Yves Bigourdan commented on CAMEL-16594:
-----------------------------------------------

> The current Camel ddbstream implementation seems to incorrectly apply the concept of {{ShardIteratorType}} to the list of shards forming a DynamoDB stream rather than each shard individually.

 

I've been thinking about this quite a bit lately. My first attempt is to keep support for all Camel URI parameters to ensure backwards compatibility, so I'm trying to extend the concept of {{ShardIteratorType}}, which in theory should only apply at a single shard level, to a tree of shards. The four possible values of {{ShardIteratorType}} are the following:
h5. LATEST

Those would be the leaves of the tree, i.e. the shards that have a {{StartingSequenceNumber}}, but no {{EndingSequenceNumber}} (shards 3, 4, 5 and 6).

The behaviour of {{LATEST}} applied to a whole tree sounds reasonable in my opinion.
h5. TRIM_HORIZON

Those would be the root of tree, i.e. the shards that have no {{ParentShardId}}. I'm not entirely sure what would happen if DynamoDB decided to drop Shard 0 (or if that's even possible), so a slightly more general/safer definition would be either shards that have no {{ParentShardId}} or which have a {{ParentShardId}} that can no longer be found in the tree. In my above example, if DynamoDB decided to drop Shard 0, Shard 1 and 2 would be the roots of two small subtrees.

The behaviour of {{TRIM_HORIZON}} applied to a whole tree sounds reasonable in my opinion.
h5. AT_SEQUENCE_NUMBER

This is where things become more troublesome. Consider the attached JSON. Take sequence number 105800000000033207000000 for example. This sequence number definitely doesn't belong to Shards 0, 1 and 2.

If you apply the same logic as the current implementation ([https://github.com/apache/camel/blob/6119fdc379db343030bd25b191ab88bbec34d6b6/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardList.java#L106]), you would end up with shard 5 because it has the biggest {{StartingSequenceNumber}}. However, the sequence number we've picked, 105800000000033207000000, is smaller than the {{StartingSequenceNumber}} of shard 5, 105800000000033207048658. The current logic is incorrect and will probably result in a 400 being returned by AWS.

If we refine our logic and look for a shard that has a {{StartingSequenceNumber}} smaller than the one we've picked, shards 3, 4, 6 are all candidates. We've narrowed things down a little, but if we tried to iterate over the three shards with that target sequence number, we'd still likely get two 400 errors returned by AWS.

The most likely answer is shard 6: indeed, 105800000000033207000000 is greater than 105800000000025199618049, the {{StartingSequenceNumber}} of shard 6, and 105800000000033207000000 is numerically the closest amongst the {{StartingSequenceNumbers}} of shards 3, 4 and 6.

We've come up with the following algorithm:
 * iterate through all non-leaf shards in the tree and try to find a shard that has a {{StartingSequenceNumber}} smaller and an {{EndingSequenceNumber}} greater than the target sequence number.
 * if none are found, iterate through leaf shards and pick the one that has a {{StartingSequenceNumber}} smaller than the target sequence number but numerically closest.

However, I don't think we can definitely say it will be shard 6 for sure:
 * what if the sequence number within shard 3 had suddenly jumped and had reached 105800000000033207000000? The [documentation of Kinesis|https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#terminology], which I believe underpins DynamoDB streams, states that "sequence numbers for the same partition key generally increase over time". That is very vague, even comparing numbers within a given shard is an approximation, let alone across multiple shards.
 * what if the same sequence number is present in multiple shards? The Kinesis documentation does states that "each data record has a sequence number that is unique per partition-key within its shard". Uniqueness across multiple shards is not guaranteed.
 * even if we did manage to select the correct shard(s), users would never get future updates from the other three leaf shards. Is that really what they'd want?

I feel that anything we come up with in this area would be a potentially confusing heuristic, rather than a rigorous algorithm. Using {{AT_SEQUENCE_NUMBER}} really only makes sense if you provide the specific shard identifier with it (which the AWS CLI forces you to do).
h5. AFTER_SEQUENCE_NUMBER

Pretty much the same as AT_SEQUENCE_NUMBER.

 
----
 

Taking a step back, if I had the opportunity to redesign things from scratch, I would only allow Camel users to define two modes of iteration through a DynamoDB stream, {{FROM_LATEST}} and {{FROM_START}}, intentionally stepping away from the {{ShardIteratorType}} wording. You'd simply pick either the roots or leaves of the tree, and not let users select random sequence numbers which could belong to any shard.

> DynamoDB stream updates are missed when there are more than one active shards
> -----------------------------------------------------------------------------
>
>                 Key: CAMEL-16594
>                 URL: https://issues.apache.org/jira/browse/CAMEL-16594
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-aws
>            Reporter: Pierre-Yves Bigourdan
>            Assignee: Andrea Cosentino
>            Priority: Major
>         Attachments: shards.json
>
>
> The current Camel ddbstream implementation seems to incorrectly apply the concept of {{ShardIteratorType}} to the list of shards forming a DynamoDB stream rather than each shard individually.
> According to the [AWS documentation|https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html#DDB-streams_GetShardIterator-request-ShardIteratorType]:
> {noformat}
> ShardIteratorType determines how the shard iterator is used to start reading stream records from the shard.
> {noformat}
> For example, for a given shard, when {{ShardIteratorType}} equal to {{LATEST}}, the AWS SDK will read the most recent data in that particular shard. However, when {{ShardIteratorType}} equal to {{LATEST}}, Camel will additionally use {{ShardIteratorType}} to determine which shard it considers amongst all the available ones in the stream: https://github.com/apache/camel/blob/6119fdc379db343030bd25b191ab88bbec34d6b6/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddbstream/ShardIteratorHandler.java#L132
> If my understanding is correct, shards in DynamoDB are modelled as a tree, with the child leaf nodes being the shards that are still active, i.e. the ones where new stream data will appear. These child shards will have a {{StartingSequenceNumber}}, but no {{EndingSequenceNumber}}.
> The most common case is to have a single shard, or a single branch of parent and child nodes:
> {noformat}
> Shard0
>    |
> Shard1
> {noformat}
> In the above case, new data will be added to {{Shard1}}, and the Camel implementation which  looks only at the last shard when {{ShardIteratorType}} is equal to {{LATEST}}, will be correct.
> However, the tree can also look like this (see related example in the attached JSON output from the AWS CLI, where the shard number matches the index in the JSON list):
> {noformat}
>              Shard0
>             /      \
>      Shard1          Shard2
>     /      \        /      \ 
> Shard3   Shard4  Shard5   Shard6
> {noformat}
> In this case, Camel will only consider Shard6, even though new data may be added to any of Shard3, Shard4, Shard5 or Shard6. This leads to updates being missed.
> As far as I can tell, DynamoDB will split into multiple shards depending on the number of table partitions, which will either grow for a table with huge amounts of data, or when an exiting table with provisioned capacity is migrated to on-demand provisioning.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)