You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Shanthoosh Venkataraman (JIRA)" <ji...@apache.org> on 2018/04/12 21:06:00 UTC

[jira] [Commented] (SAMZA-1655) StreamProcessor skips all events from zookeeper server after zkClient session expiration.

    [ https://issues.apache.org/jira/browse/SAMZA-1655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16436304#comment-16436304 ] 

Shanthoosh Venkataraman commented on SAMZA-1655:
------------------------------------------------

*Fix:*
A. Route all the watcher events from the zookeeper server to the ScheduleAfterDebounce worker queue and execute it through the worker thread. After this change, all the events before an session expiration will be buffered in the ScheduleAfterDebounce worker queue. Upon session expiration, stale buffered events in the queue will be deleted. When a processor has it's session expired from one zookeeper server and reconnects to other zookeeper server in the ensemble, we reconnect and recreate the ephemeral processor node(because the previous ephemeral node of the processor is gone). This situation is synonymous to a new processor joining the processors group. So clearing the stale buffered zookeeper events in ScheduleAfterDebounce worker queue is mandatory and should not cause corruption issues.
B. Remove generationId based check before zkWatcherEvent handling and the associated relevant code.

> StreamProcessor skips all events from zookeeper server after zkClient session expiration.
> -----------------------------------------------------------------------------------------
>
>                 Key: SAMZA-1655
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1655
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Shanthoosh Venkataraman
>            Assignee: Shanthoosh Venkataraman
>            Priority: Major
>
> *Problem:* StreamProcessor skips all events from zookeeper server after zkClient session expiration.
> *Reason:*
> All the zkWatchers(dataChange, childChange handler) in a StreamProcessor are instantiated with an inmemory generationId(zkUtils.generationId).
> All the zkWatchers cache this generationId passed through the constructor.
> This inmemory generationId is incremented whenever the zkClient session expiration occurs.
> Any event from zookeeper server is dropped if the inmemory generationId is not equal to the generationId cached in a zkWatcher.
> After the zkClient session expiration in a StreamProcessor, this will result in zombie StreamProcessors and will stall message processing.
> In worst case, if this happens to the leader StreamProcessor then the whole processors group will be stalled.
> Purpose of generationId:
> GenerationId was added initially to skip older stale events that are buffered in worker queue after a session expiration phase and guard against the following scenario:
> 1. Session expiration happens to the leader stream processor of the group.
> 2. Leader stream processor joins the group as a follower after the session reconnect.
> 3. Leader might have zkEvents buffered in it's worker queue which were delivered to it when it was a leader. If the leader acts upon these events it will cause global state corruption.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)