You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Shanthoosh Venkataraman (JIRA)" <ji...@apache.org> on 2018/12/07 19:21:00 UTC

[jira] [Created] (SAMZA-2033) Inconsistency in the inmemory offsets managed by the TaskSideInputStorageManager.

Shanthoosh Venkataraman created SAMZA-2033:
----------------------------------------------

             Summary: Inconsistency in the inmemory offsets managed by the TaskSideInputStorageManager.
                 Key: SAMZA-2033
                 URL: https://issues.apache.org/jira/browse/SAMZA-2033
             Project: Samza
          Issue Type: Bug
            Reporter: Shanthoosh Venkataraman


 TaskSideInputStorageManager is used to restore the side-input streams of a samza job. Users can process messages from the side-input stream by plugging in a UDF.

Samza's contract is to provide the UDF with the event from the side-input streams.

TaskSideInputStorageManager maintains the offset of the processed side-input messages inmemory and flushes them to the disk periodically.

TaskSideInputStorageManager uses AsyncRunLoop to fetch messages from the side-input streams and can receive multiple messages for the same side-input stream partition concurrently depending upon the `task.max.concurrency` configuration.

Currently the offset updation logic in TaskSideInputStorageManager just updates the inmemory offsets map with the offset of the last processed message.

This can cause offset corruption when there's out of order event completion as in the following scenario:

Consider a side-input stream with messages: [0, 1, 2] in partition: 0 and task.max.concurrency is set to 3.

 
{code:java}

Time 0: SideInputStorageManager receives the message 0 and hands it to over to UDF.
Time 1: SideInputStorageManager receives the message 1 and hands it to over to UDF.
Time 2: SideInputStorageManager receives the message 2 and hands it to over to UDF.
Time 3: Processing for message 2 is completed by the UDF and TaskSideInputStorageManager records the last processed offset for partition 0 as 2.
Time 4: Processing for message 1 is completed by the UDF and TaskSideInputStorageManager records the last processed offset for partition 0 as 1.
Time 5: Processing for message 0 is completed by the UDF and TaskSideInputStorageManager records the last processed offset for partition 0 as 0.
Time 6: TaskSideInputStorageManager flushes the incorrect offset map to disk

{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)