You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Yan Fang (JIRA)" <ji...@apache.org> on 2015/06/03 02:11:50 UTC
[jira] [Created] (SAMZA-699) CoordinatorStreamMessage loses orders
in CoordinatorStreamSystemConsumer
Yan Fang created SAMZA-699:
------------------------------
Summary: CoordinatorStreamMessage loses orders in CoordinatorStreamSystemConsumer
Key: SAMZA-699
URL: https://issues.apache.org/jira/browse/SAMZA-699
Project: Samza
Issue Type: Bug
Reporter: Yan Fang
Currently we are using HashSet<CoordinatorStreamMessage>() for both bootstrappedStreamSet and bootstrappedStream. But the messages' order in the Set is unpredictable. This causes the wrong checkpoint. Something like:
{code}
2015-06-02 16:18:39 CheckpointManager [DEBUG] Adding checkpoint Partition 1 for taskName Partition 0
2015-06-02 16:18:39 CheckpointManager [INFO] previous checkpoint offsets Partition 0 Checkpoint [offsets={}]
2015-06-02 16:18:39 CheckpointManager [DEBUG] Adding checkpoint Partition 0 for taskName Partition 0
2015-06-02 16:18:39 CheckpointManager [INFO] previous checkpoint offsets Partition 0 Checkpoint [offsets={SystemStreamPartition [kafka, test-global-stream, 0]=52, SystemStreamPartition [kafka, test-input-stream, 0]=330}]
2015-06-02 16:18:39 CheckpointManager [DEBUG] Adding checkpoint Partition 0 for taskName Partition 0
2015-06-02 16:18:39 CheckpointManager [INFO] previous checkpoint offsets Partition 2 Checkpoint [offsets={}]
2015-06-02 16:18:39 CheckpointManager [DEBUG] Adding checkpoint Partition 2 for taskName Partition 0
2015-06-02 16:18:39 CheckpointManager [INFO] previous checkpoint offsets Partition 2 Checkpoint [offsets={}]
{code}
"previous checkpoint offsets" is added log, not from the system.
The checkpoint maybe overwritten by old checkpoint.
So I think we should use LinkedHashSet to preserve the order.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)