You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Lakshmi Manasa Gaduputi (JIRA)" <ji...@apache.org> on 2019/03/20 06:01:00 UTC

[jira] [Updated] (SAMZA-2136) Improve run.id generation for batch in standalone

     [ https://issues.apache.org/jira/browse/SAMZA-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Lakshmi Manasa Gaduputi updated SAMZA-2136:
-------------------------------------------
    Description: 
Improve the current read-write-lock based implementation [link PR|[https://github.com/apache/samza/pull/938]] to generate run.id for batch jobs in standalone mode.

A simpler approach proposed by [~boryas] and [~bharathkk] is as follows: 

run.id is stored in an ephemeral node in ZK.
 * STEP 1: All processors check if run.id node exists and get the version/stat/status of the node.
 * STEP 2: If node does not exist, generate a run.id and write it to ZK (create an ephemeral node).
 * STEP 3: If write succeeds continue. Else if write fails, throw away generated run.id and go back to STEP 1
 * STEP 4: if node exists, get run.id and check if it matches local run.id (if it exists). If  run.id on node does not match the local run.id kill the processor
 * STEP 5: if node exists and local run.id does not exist or matches run.id on node, set local run.id to run.id on node and set a watch on the node
 * STEP 6: if the watch is triggered (due to run.id node value changed or node deleted), go to STEP 1.

This approach has two concerns that need to be addressed –
 # unclean shutdown (ex: kill -9): ephemeral node is not cleaned up and a restart of the job too soon would end up reusing old run's id.
 # session disconnect/reconnect: since upon reconnect, if the run.id has differed then we would have to kill the reconnecting processors.

Note that the above concerns exist in the read-write-lock implementation too.

Alternate approaches:
 # Metadata store: Proposed by [~jmaes]: After implementation of metadata store is complete, a command can be added to it to provide a  run.id to the batch job.
 # Leader + BarrierState: Proposed by [~spvenkat]: BarrierState holds (run.id, active processorIds, emphemeral processorIds). run.id will be generated based upon previous  barrier state and current set of active processorIds/ephemeralIds. Each time a processor joins, the leader creates a new barrier and updates the state once all of them have joined the barrier.
 # Leader + persistent run.id node: Proposed by [~spvenkat]: Persistent node holds the state consisting of (run.id, active processorIds, emphemeral processorIds).  run.id will be generated based upon previous  run.id state and current set of active processorIds/ephemeralIds.

 

  was:
Improve the current read-write-lock based implementation [link PR|[https://github.com/apache/samza/pull/938]] to generate run.id for batch jobs in standalone mode.

A simpler approach proposed by [~boryas] and [~bharathkk] is as follows: 

run.id is stored in an ephemeral node in ZK.
 * STEP 1: All processors check if run.id node exists and get the version/stat/status of the node.
 * STEP 2: If node does not exist, generate a run.id and write it to ZK (create an ephemeral node).
 * STEP 3: If write succeeds continue. Else if write fails, throw away generated run.id and go back to STEP 1
 * STEP 4: if node exists, get run.id and check if it matches local run.id (if it exists). If  run.id on node does not match the local run.id kill the processor
 * STEP 5: if node exists and local run.id does not exist or matches run.id on node, set local run.id to run.id on node and set a watch on the node
 * STEP 6: if the watch is triggered (due to run.id node value changed or node deleted), go to STEP 1.

Additionally, once the metadata store has been implemented, a command can be added to it to provide a  run.id to the batch job. 

 


> Improve run.id generation for batch in standalone
> -------------------------------------------------
>
>                 Key: SAMZA-2136
>                 URL: https://issues.apache.org/jira/browse/SAMZA-2136
>             Project: Samza
>          Issue Type: Improvement
>            Reporter: Lakshmi Manasa Gaduputi
>            Priority: Major
>
> Improve the current read-write-lock based implementation [link PR|[https://github.com/apache/samza/pull/938]] to generate run.id for batch jobs in standalone mode.
> A simpler approach proposed by [~boryas] and [~bharathkk] is as follows: 
> run.id is stored in an ephemeral node in ZK.
>  * STEP 1: All processors check if run.id node exists and get the version/stat/status of the node.
>  * STEP 2: If node does not exist, generate a run.id and write it to ZK (create an ephemeral node).
>  * STEP 3: If write succeeds continue. Else if write fails, throw away generated run.id and go back to STEP 1
>  * STEP 4: if node exists, get run.id and check if it matches local run.id (if it exists). If  run.id on node does not match the local run.id kill the processor
>  * STEP 5: if node exists and local run.id does not exist or matches run.id on node, set local run.id to run.id on node and set a watch on the node
>  * STEP 6: if the watch is triggered (due to run.id node value changed or node deleted), go to STEP 1.
> This approach has two concerns that need to be addressed –
>  # unclean shutdown (ex: kill -9): ephemeral node is not cleaned up and a restart of the job too soon would end up reusing old run's id.
>  # session disconnect/reconnect: since upon reconnect, if the run.id has differed then we would have to kill the reconnecting processors.
> Note that the above concerns exist in the read-write-lock implementation too.
> Alternate approaches:
>  # Metadata store: Proposed by [~jmaes]: After implementation of metadata store is complete, a command can be added to it to provide a  run.id to the batch job.
>  # Leader + BarrierState: Proposed by [~spvenkat]: BarrierState holds (run.id, active processorIds, emphemeral processorIds). run.id will be generated based upon previous  barrier state and current set of active processorIds/ephemeralIds. Each time a processor joins, the leader creates a new barrier and updates the state once all of them have joined the barrier.
>  # Leader + persistent run.id node: Proposed by [~spvenkat]: Persistent node holds the state consisting of (run.id, active processorIds, emphemeral processorIds).  run.id will be generated based upon previous  run.id state and current set of active processorIds/ephemeralIds.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)