You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/07/10 23:36:00 UTC

[jira] [Commented] (SAMZA-1730) Add state validations in StreamProcesssor.

    [ https://issues.apache.org/jira/browse/SAMZA-1730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16539370#comment-16539370 ] 

ASF GitHub Bot commented on SAMZA-1730:
---------------------------------------

Github user asfgit closed the pull request at:

    https://github.com/apache/samza/pull/535


> Add state validations in StreamProcesssor.
> ------------------------------------------
>
>                 Key: SAMZA-1730
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1730
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Shanthoosh Venkataraman
>            Assignee: Shanthoosh Venkataraman
>            Priority: Major
>
> Existing StreamProcessor implementation doesn't have state variable to represent it's current state(if it's in rebalance/shutdown/running state). Absence of this information leads to following two problems.
>  - When execution of StreamProcessor.stop() and JobCoordinatorListener.onNewJobModel() happen concurrently, SamzaContainer might be still running after StreamProcessor had been stopped(due to interleaved execution order). Here's a sample execution order:
>                  1. User thread invokes `SamzaContaienr.stop()`. 
>                   2. `StreamProcessor.stop()` stops the current running samza container.
>                   3. Before StreamProcessor stops the ZkJobCoordinator, ZkJobCoordinator initializes and executes a new SamzaContainer(Due to change in global processors group).
>                   4. StreamProcessor stops the ZkJobCoordinator.
>  - When execution of StreamProcessor.stop() and JobCoordinatorListener.jobModelExpired() happen concurrently, StreamProcessor will not be stopped cleanly.
>                 `paused` is state variable held in SamzaContainer to indicate if it has been stopped for new JobModel for JobCoordinator(By default `paused` is set to `false` in `SamzaContainer`). Here's a sample execution order:
>                 1. User thread invokes `SamzaContainer.stop()` and triggers `SamzaContainer.shutdown`. At the point, user thread is waiting for `onContainerStop(paused=false)`(container stopped callback with paused = false).
>                  2. Before the SamzaContainer is shutdown, debounce thread invokes `onJobModelExpired` and triggers `SamzaContainer.pause()`. `SamzaContainer.pause()` sets SamzaContainer local state `paused` to true.
>                  3 . SamzaContainer shuts down and triggers the container shutdown callback: onContainerStop(with paused = true). When paused is set to true in onContainerStop callback, StreamProcessor shutdown sequence is not triggered. 
>                  4. StreamProcessor would participate in the processor group coordination activities as if shutdown was not triggered.
>                  5. LocalApplicationRunner.waitForFinish will block indefinitely.
> To solve the above problems, following changes were done:
>  * Add state field to `StreamProcessor` to represent it's current state. Before performing any StreamProcessor operation, it's current state is checked and only if it's valid for the operation then the operation is performed.
>  * Remove the paused state from SamzaContainer and it is covered by the StreamProcessor state itself.
>  * Make onJobModelExpired as start state for any custom JobCoordinators. 
>  * Interrupt the container thread on StreamProcessor.stop() if it's running after shutdown timeout.
>  * Add tests to verify the behavior in StreamProcessor/LocalApplicationRunner.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)