You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Yi Pan (Data Infrastructure) (JIRA)" <ji...@apache.org> on 2018/05/25 08:10:00 UTC

[jira] [Commented] (SAMZA-1665) Fix race condition when stopping StreamProcessor.

    [ https://issues.apache.org/jira/browse/SAMZA-1665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16490386#comment-16490386 ] 

Yi Pan (Data Infrastructure) commented on SAMZA-1665:
-----------------------------------------------------

There is another sequence in this race condition that results in the waiting forever in LocalApplicationRunner.waitForFinish():
LocalApplicationRunner main thread: 
# stop() called to set the flag and call StreamProcessor.stop() asynchronously, which only stops the container in the first invocation. stop JC is expected to be called second time via ContainerListener after the container is properly shutdown.
# waitForFinish() waiting for the shutdownLatch in LocalApplicationRunner, which expects the JC stop is called and count down the latch.

SamzaContainer thread:
# start the proper shutting down sequence

Debounce timer thread:
# Detected number of processors change and triggered JobModel expiration
# Calls JobCoordinatorListener.onJobModelExpired(), which invokes container.pause(), setting the pause flag and calling shutting down the container while the SamzaContainer thread is already in the middle of shutdown sequence
# wait on jcContainerShutdownLatch

SamzaContainer thread:
# continue with the shutdown sequence and complete
# Because container.pause flag is set by Debounce timer thread, invoking containerListener.onContainerStop(pause==true), which assumes that the container shutdown is by active JC in reaction to JobModel changes. Hence, it only count down jcContainerShutdownLatch and returns after the shutdown sequence, leaving the JC running.

Debounce timer thread:
# awake from waiting on jcContainerShutdownLatch.await() and continue as if the job model is to be updated
# new container is restarted and JC continues to be alive, w/o count down the LocalApplicationRunner.shutdownLatch (i.e. this latch is expected to be count down by JobCoordinatorListner.onCoordinatorStop() -> processorListener.onShutdown())

This results in the user code calling LocalApplicationRunner.waitForFinish() to hang forever. Enabling kill() and waitForFinish() in TestZkLocalApplicationRunner#shouldUpdateJobModelWhenNewProcessorJoiningGroupUsingAllSspToSingleTaskGrouperFactory can easily re-produce this hanging issue.

> Fix race condition when stopping StreamProcessor.
> -------------------------------------------------
>
>                 Key: SAMZA-1665
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1665
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Shanthoosh Venkataraman
>            Assignee: Shanthoosh Venkataraman
>            Priority: Major
>
> When the user thread stops the streamProcessor and onNewJobModelExpired event is executed from debounce thread at the same time, we observe the following issue:
> A. User thread stops the StreamProcessor which in turn stops the current running container.
> B. Before ZkJobCoordinator is shutdown, onNewJobModelExpired is executed from debounce thread(which spawns a new container). 
> C. User thread stops the ZkJobCoordinator. 
> After the StreamProcessor shutdown sequence returns,  container thread is alive and zkJobCoordinator is dead. This results in a orphaned container when stopping stream processor.  We were able reproduce this locally.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)