You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Cameron Lee (Jira)" <ji...@apache.org> on 2019/10/17 17:10:00 UTC

[jira] [Assigned] (SAMZA-2300) Incomplete propagation of end-of-stream messages for intermediate stream operators

     [ https://issues.apache.org/jira/browse/SAMZA-2300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Cameron Lee reassigned SAMZA-2300:
----------------------------------

    Assignee:     (was: Cameron Lee)

> Incomplete propagation of end-of-stream messages for intermediate stream operators
> ----------------------------------------------------------------------------------
>
>                 Key: SAMZA-2300
>                 URL: https://issues.apache.org/jira/browse/SAMZA-2300
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Cameron Lee
>            Priority: Major
>
> Summary:
> If an intermediate operator (e.g. partitionBy) corresponds to multiple partitions, and the intermediate system returns IncomingMessageEnvelope.END_OF_STREAM_OFFSET for end-of-stream messages, then those end-of-stream messages may not get properly propagated to the other partitions.
> More context:
> End-of-stream propagation currently works by aggregating all end-of-stream messages in a single partition and then broadcasting once that single partition gets the expected number of end-of-stream messages. This means a single partition needs to wait for multiple end-of-stream messages. However, in SystemConsumers, if an end-of-stream message is found, then it will mark the stream to no longer be polled for more messages. This means that the aggregate partition may not consume all end-of-stream messages, so it will not broadcast to the other partitions.
> This issue was found while trying to migrate some tests to use in-memory system. The in-memory system explicitly sets the offset to IncomingMessageEnvelope.END_OF_STREAM_OFFSET. Existing tests which use in-memory system only have a single partition, so those work. The reason why we probably haven't seen this bug in real use cases is that the kafka system does not set the offset to be IncomingMessageEnvelope.END_OF_STREAM_OFFSET.
> It is possible that this issue could have been avoided if IncomingMessageEnvelope did not have two different signals for an end-of-stream message (i.e. END_OF_STREAM_OFFSET for "offset" field and EndOfStreamMessage for "message" field).
> Fix:
> To fix this, we can change IncomingMessageEnvelope.isEndOfStream to check the message type. We also need to check if the EndOfStreamMessage taskName is non-null, which means that the end-of-stream message came from inside Samza, so we don't want to stop polling in those cases.
> Also, we should hide the END_OF_STREAM_OFFSET since it should not be needed as an external API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)