You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Jake Maes (JIRA)" <ji...@apache.org> on 2019/02/26 00:52:03 UTC

[jira] [Updated] (SAMZA-2033) Inconsistency in the inmemory offsets managed by the TaskSideInputStorageManager.

     [ https://issues.apache.org/jira/browse/SAMZA-2033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jake Maes updated SAMZA-2033:
-----------------------------
    Fix Version/s: 1.0.1
                   1.0.1

> Inconsistency in the inmemory offsets managed by the TaskSideInputStorageManager.
> ---------------------------------------------------------------------------------
>
>                 Key: SAMZA-2033
>                 URL: https://issues.apache.org/jira/browse/SAMZA-2033
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Shanthoosh Venkataraman
>            Assignee: Bharath Kumarasubramanian
>            Priority: Major
>             Fix For: 1.0.1
>
>
> TaskSideInputStorageManager is used to restore the side-input streams of a samza job.
> Users can process messages from the side-input stream by plugging in a UDF.
> TaskSideInputStorageManager maintains the last processed side-input message offset inmemory and flushes them to the disk periodically.
> TaskSideInputStorageManager uses AsyncRunLoop to fetch messages from the side-input streams and can receive multiple messages for the same side-input stream partition concurrently depending upon the task.max.concurrency configuration.
> Currently the offset updation logic in TaskSideInputStorageManager just updates the inmemory offsets map with the offset of the last processed message.
> This can cause offset corruption when there's out of order event completion as in the following scenario:
> Consider a side-input stream with messages: [0, 1, 2] in partition: 0 and task.max.concurrency is set to 3.
> {code:java}
> Time 0: SideInputStorageManager receives the message 0 and hands it to over to UDF.
> Time 1: SideInputStorageManager receives the message 1 and hands it to over to UDF.
> Time 2: SideInputStorageManager receives the message 2 and hands it to over to UDF.
> Time 3: Processing for message 2 is completed by the UDF and TaskSideInputStorageManager records the last processed offset for partition 0 as 2.
> Time 4: Processing for message 1 is completed by the UDF and TaskSideInputStorageManager records the last processed offset for partition 0 as 1.
> Time 5: Processing for message 0 is completed by the UDF and TaskSideInputStorageManager records the last processed offset for partition 0 as 0.
> Time 6: TaskSideInputStorageManager flushes the incorrect offset map to disk.{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)