You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Yan Fang (JIRA)" <ji...@apache.org> on 2015/06/04 18:49:38 UTC

[jira] [Commented] (SAMZA-699) CoordinatorStreamMessages lose orders in CoordinatorStreamSystemConsumer

    [ https://issues.apache.org/jira/browse/SAMZA-699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14573112#comment-14573112 ] 

Yan Fang commented on SAMZA-699:
--------------------------------

Thanks, [~a.pejakovic]. Added you to the contributor. The patch looks good for me.

[~naveenatceg], does this change make sense to you? 

> CoordinatorStreamMessages lose orders in CoordinatorStreamSystemConsumer
> ------------------------------------------------------------------------
>
>                 Key: SAMZA-699
>                 URL: https://issues.apache.org/jira/browse/SAMZA-699
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Yan Fang
>         Attachments: SAMZA-TASK-699.0.patch
>
>
> Currently we are using HashSet<CoordinatorStreamMessage>() for both bootstrappedStreamSet and bootstrappedStream. But the messages' order in the Set is unpredictable. This causes the wrong checkpoint. Something like:
> {code}
> 2015-06-02 16:18:39 CheckpointManager [DEBUG] Adding checkpoint Partition 1 for taskName Partition 0
> 2015-06-02 16:18:39 CheckpointManager [INFO] previous checkpoint offsets Partition 0 Checkpoint [offsets={}]
> 2015-06-02 16:18:39 CheckpointManager [DEBUG] Adding checkpoint Partition 0 for taskName Partition 0
> 2015-06-02 16:18:39 CheckpointManager [INFO] previous checkpoint offsets Partition 0 Checkpoint [offsets={SystemStreamPartition [kafka, test-global-stream, 0]=52, SystemStreamPartition [kafka, test-input-stream, 0]=330}]
> 2015-06-02 16:18:39 CheckpointManager [DEBUG] Adding checkpoint Partition 0 for taskName Partition 0
> 2015-06-02 16:18:39 CheckpointManager [INFO] previous checkpoint offsets Partition 2 Checkpoint [offsets={}]
> 2015-06-02 16:18:39 CheckpointManager [DEBUG] Adding checkpoint Partition 2 for taskName Partition 0
> 2015-06-02 16:18:39 CheckpointManager [INFO] previous checkpoint offsets Partition 2 Checkpoint [offsets={}]
> {code}
> "previous checkpoint offsets" is added log, not from the system.
> The checkpoint maybe overwritten by old checkpoint.
> So I think we should use LinkedHashSet to preserve the order.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)