You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Lakshmi Manasa Gaduputi (JIRA)" <ji...@apache.org> on 2019/03/20 06:01:00 UTC
[jira] [Updated] (SAMZA-2136) Improve run.id generation for batch
in standalone
[ https://issues.apache.org/jira/browse/SAMZA-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lakshmi Manasa Gaduputi updated SAMZA-2136:
-------------------------------------------
Description:
Improve the current read-write-lock based implementation [link PR|[https://github.com/apache/samza/pull/938]] to generate run.id for batch jobs in standalone mode.
A simpler approach proposed by [~boryas] and [~bharathkk] is as follows:
run.id is stored in an ephemeral node in ZK.
* STEP 1: All processors check if run.id node exists and get the version/stat/status of the node.
* STEP 2: If node does not exist, generate a run.id and write it to ZK (create an ephemeral node).
* STEP 3: If write succeeds continue. Else if write fails, throw away generated run.id and go back to STEP 1
* STEP 4: if node exists, get run.id and check if it matches local run.id (if it exists). If run.id on node does not match the local run.id kill the processor
* STEP 5: if node exists and local run.id does not exist or matches run.id on node, set local run.id to run.id on node and set a watch on the node
* STEP 6: if the watch is triggered (due to run.id node value changed or node deleted), go to STEP 1.
This approach has two concerns that need to be addressed –
# unclean shutdown (ex: kill -9): ephemeral node is not cleaned up and a restart of the job too soon would end up reusing old run's id.
# session disconnect/reconnect: since upon reconnect, if the run.id has differed then we would have to kill the reconnecting processors.
Note that the above concerns exist in the read-write-lock implementation too.
Alternate approaches:
# Metadata store: Proposed by [~jmaes]: After implementation of metadata store is complete, a command can be added to it to provide a run.id to the batch job.
# Leader + BarrierState: Proposed by [~spvenkat]: BarrierState holds (run.id, active processorIds, emphemeral processorIds). run.id will be generated based upon previous barrier state and current set of active processorIds/ephemeralIds. Each time a processor joins, the leader creates a new barrier and updates the state once all of them have joined the barrier.
# Leader + persistent run.id node: Proposed by [~spvenkat]: Persistent node holds the state consisting of (run.id, active processorIds, emphemeral processorIds). run.id will be generated based upon previous run.id state and current set of active processorIds/ephemeralIds.
was:
Improve the current read-write-lock based implementation [link PR|[https://github.com/apache/samza/pull/938]] to generate run.id for batch jobs in standalone mode.
A simpler approach proposed by [~boryas] and [~bharathkk] is as follows:
run.id is stored in an ephemeral node in ZK.
* STEP 1: All processors check if run.id node exists and get the version/stat/status of the node.
* STEP 2: If node does not exist, generate a run.id and write it to ZK (create an ephemeral node).
* STEP 3: If write succeeds continue. Else if write fails, throw away generated run.id and go back to STEP 1
* STEP 4: if node exists, get run.id and check if it matches local run.id (if it exists). If run.id on node does not match the local run.id kill the processor
* STEP 5: if node exists and local run.id does not exist or matches run.id on node, set local run.id to run.id on node and set a watch on the node
* STEP 6: if the watch is triggered (due to run.id node value changed or node deleted), go to STEP 1.
Additionally, once the metadata store has been implemented, a command can be added to it to provide a run.id to the batch job.
> Improve run.id generation for batch in standalone
> -------------------------------------------------
>
> Key: SAMZA-2136
> URL: https://issues.apache.org/jira/browse/SAMZA-2136
> Project: Samza
> Issue Type: Improvement
> Reporter: Lakshmi Manasa Gaduputi
> Priority: Major
>
> Improve the current read-write-lock based implementation [link PR|[https://github.com/apache/samza/pull/938]] to generate run.id for batch jobs in standalone mode.
> A simpler approach proposed by [~boryas] and [~bharathkk] is as follows:
> run.id is stored in an ephemeral node in ZK.
> * STEP 1: All processors check if run.id node exists and get the version/stat/status of the node.
> * STEP 2: If node does not exist, generate a run.id and write it to ZK (create an ephemeral node).
> * STEP 3: If write succeeds continue. Else if write fails, throw away generated run.id and go back to STEP 1
> * STEP 4: if node exists, get run.id and check if it matches local run.id (if it exists). If run.id on node does not match the local run.id kill the processor
> * STEP 5: if node exists and local run.id does not exist or matches run.id on node, set local run.id to run.id on node and set a watch on the node
> * STEP 6: if the watch is triggered (due to run.id node value changed or node deleted), go to STEP 1.
> This approach has two concerns that need to be addressed –
> # unclean shutdown (ex: kill -9): ephemeral node is not cleaned up and a restart of the job too soon would end up reusing old run's id.
> # session disconnect/reconnect: since upon reconnect, if the run.id has differed then we would have to kill the reconnecting processors.
> Note that the above concerns exist in the read-write-lock implementation too.
> Alternate approaches:
> # Metadata store: Proposed by [~jmaes]: After implementation of metadata store is complete, a command can be added to it to provide a run.id to the batch job.
> # Leader + BarrierState: Proposed by [~spvenkat]: BarrierState holds (run.id, active processorIds, emphemeral processorIds). run.id will be generated based upon previous barrier state and current set of active processorIds/ephemeralIds. Each time a processor joins, the leader creates a new barrier and updates the state once all of them have joined the barrier.
> # Leader + persistent run.id node: Proposed by [~spvenkat]: Persistent node holds the state consisting of (run.id, active processorIds, emphemeral processorIds). run.id will be generated based upon previous run.id state and current set of active processorIds/ephemeralIds.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)