You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/08/25 21:34:00 UTC

[jira] [Commented] (SAMZA-1406) Fix potential orphaned containers problem in stand alone.

    [ https://issues.apache.org/jira/browse/SAMZA-1406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16142242#comment-16142242 ] 

ASF GitHub Bot commented on SAMZA-1406:
---------------------------------------

GitHub user shanthoosh opened a pull request:

    https://github.com/apache/samza/pull/283

    SAMZA-1406: Fix potential orphaned containers problem in stand alone.

    Changes:
    
    * Switch from `executorService.shutdown()` to `executorService.shutdownNow()` in `stopScheduler()` implementation. Along with cancelling all the pending tasks, this switch will also interrupt the current running thread.
    
    * Check for the interrupted flag in `ScheduleAfterDebounceTime` scheduled task and trigger the stop sequence.
 
    
    * Update stop sequence of `ScheduleAfterDebounceTime` on task exception.
    
    * Move zookeeper specific constants outside of `ScheduleAfterDebounceTime` into `ZkJobCoordinator`(The overall purpose of this `ScheduleAfterDebounceTime` class is to become the common scheduler for both AzureTasks, ZkTasks. This cleanup takes few steps in that general direction).
    
    * Unit test to verify the callback triggering on exception.
    
    * General code cleanup and java doc added to fields and methods. 
    
    NOTE:  Scenario that triggers orphaned container is last run of `ScheduleAfterDebounceTime` brings up samza container. Though we handle the interrupt and kill container, small window between interrupt occurrence and killing container, there’ll be orphaned containers.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/shanthoosh/samza SAMZA-1406_1

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/samza/pull/283.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #283
    
----
commit 1766362544c4267582ce12875c42d7954342849b
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Date:   2017-08-23T22:52:23Z

    SAMZA-1406:
    
    Detailed explanation goes here.

----


> Fix potential orphaned containers problem in stand alone.
> ---------------------------------------------------------
>
>                 Key: SAMZA-1406
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1406
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Shanthoosh Venkataraman
>            Assignee: Shanthoosh Venkataraman
>             Fix For: 0.14.0
>
>
> When stream processor is shutting down, we can see that the already submitted Zk tasks to ScheduleAfterDebounce task queue is picked up and executed.  Here's the sample stacktrace:
> {code:java}
> 111018 [p-0000000002-container-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down JobCoordinator from StreamProcessor
> 111043 [SessionTracker] INFO org.apache.zookeeper.server.SessionTrackerImpl - SessionTrackerImpl exited loop!
> 111703 [Thread-43-SendThread(127.0.0.1:64983)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:64983. Will not attempt to authenticate using SASL (unknown error)
> 111704 [Thread-43-SendThread(127.0.0.1:64983)] WARN org.apache.zookeeper.ClientCnxn - Session 0x15e05975d900005 for server null, unexpected error, closing socket connection and attempting reconnect
> java.net.ConnectException: Connection refused
> 	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> 	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> 	at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> 	at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
> 111735 [Thread-16-SendThread(127.0.0.1:64594)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:64594. Will not attempt to authenticate using SASL (unknown error)
> 111839 [p-0000000002-container-thread-0] INFO org.apache.zookeeper.ZooKeeper - Session: 0x15e059731f90005 closed
> 111839 [Thread-16-EventThread] INFO org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 0x15e059731f90005
> 111839 [p-0000000002-container-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down the executor service.
> 111840 [CONTAINER-SHUTDOWN-HOOK] INFO org.apache.samza.container.SamzaContainer - Shutdown complete
> 111872 [Thread-43-SendThread(127.0.0.1:64983)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:64983. Will not attempt to authenticate using SASL (unknown error)
> 111872 [Thread-43-SendThread(127.0.0.1:64983)] WARN org.apache.zookeeper.ClientCnxn - Session 0x15e05975d900004 for server null, unexpected error, closing socket connection and attempting reconnect
> java.net.ConnectException: Connection refused
> 	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> 	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> 	at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> 	at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
> 111941 [debounce-thread-0] ERROR org.apache.samza.zk.ScheduleAfterDebounceTime - OnProcessorChange threw an exception.
> org.apache.samza.SamzaException: Cannot read ZK node: /app-test-app-name-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-test-app-id-a6b3542d-b6ce-4517-ad6f-d25070da4ce1/test-app-name-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-test-app-id-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-coordinationData/processors/0000000000
> 	at org.apache.samza.zk.ZkUtils.readProcessorData(ZkUtils.java:230)
> 	at org.apache.samza.zk.ZkUtils.getActiveProcessorsIDs(ZkUtils.java:253)
> 	at org.apache.samza.zk.ZkJobCoordinator.getActualProcessorIds(ZkJobCoordinator.java:256)
> 	at org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange(ZkJobCoordinator.java:167)
> 	at org.apache.samza.zk.ZkJobCoordinator.lambda$onProcessorChange$1(ZkJobCoordinator.java:161)
> 	at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$scheduleAfterDebounceTime$0(ScheduleAfterDebounceTime.java:95)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: ZkClient already closed!
> 	at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
> 	at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1099)
> 	at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1094)
> 	at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1083)
> 	at org.apache.samza.zk.ZkUtils.readProcessorData(ZkUtils.java:224)
> 	... 12 more
> 111942 [debounce-thread-0] ERROR org.apache.samza.zk.ZkJobCoordinator - Received exception from in JobCoordinator Processing!
> org.apache.samza.SamzaException: Cannot read ZK node: /app-test-app-name-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-test-app-id-a6b3542d-b6ce-4517-ad6f-d25070da4ce1/test-app-name-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-test-app-id-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-coordinationData/processors/0000000000
> 	at org.apache.samza.zk.ZkUtils.readProcessorData(ZkUtils.java:230)
> 	at org.apache.samza.zk.ZkUtils.getActiveProcessorsIDs(ZkUtils.java:253)
> 	at org.apache.samza.zk.ZkJobCoordinator.getActualProcessorIds(ZkJobCoordinator.java:256)
> 	at org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange(ZkJobCoordinator.java:167)
> 	at org.apache.samza.zk.ZkJobCoordinator.lambda$onProcessorChange$1(ZkJobCoordinator.java:161)
> 	at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$scheduleAfterDebounceTime$0(ScheduleAfterDebounceTime.java:95)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.SchedeadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: ZkClient already closed!
> 	at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
> 	at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1099)
> 	at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1094)
> 	at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1083)
> 	at org.apache.samza.zk.ZkUtils.readProcessorData(ZkUtils.java:224)
> 	... 12 more
> 111944 [debounce-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down the executor service.
> 111962 [p-0000000000-container-thread-0] INFO org.apache.zookeeper.ZooKeeper - Session: 0x15e059709020003 closed
> {code}
>  
>  
> Reason being, we call `executorService.shutdownNow();` from StreamProcessor, which just sets the interrupt flag to true for running threads in task queue.
>  
> We don't check if the thread is interrupted or not when starting to execute or while executing in the task implementation. 
> User thread is shutting down the stream processor and at the same time due to an external event, the executorService task is bringing the stream processor back up(and the task is not killed by executorService.shutdown depending upon timing of events). 
> Actual implementation detail where this could happen is `debounceTimer.scheduleAfterDebounceTime(barrierAction, 0, () -> onNewJobModelConfirmed(version))` and LocalApplicationRunner.kill(streamApplication) triggered at the same time. 
> In worst case this will lead to orphaned containers problem, since the job coordinator associated with stream application is stopped and the stream processor is still running(provided that there're multiple stream processors in a JVM).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)