You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Xinyu Liu (JIRA)" <ji...@apache.org> on 2018/01/02 18:24:00 UTC

[jira] [Resolved] (SAMZA-1406) Fix potential orphaned containers problem in stand alone.

     [ https://issues.apache.org/jira/browse/SAMZA-1406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Xinyu Liu resolved SAMZA-1406.
------------------------------
    Resolution: Fixed

> Fix potential orphaned containers problem in stand alone.
> ---------------------------------------------------------
>
>                 Key: SAMZA-1406
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1406
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Shanthoosh Venkataraman
>            Assignee: Shanthoosh Venkataraman
>             Fix For: 0.14.0
>
>
> When stream processor is shutting down, we can see that the already submitted Zk tasks to ScheduleAfterDebounce task queue is picked up and executed.  Here's the sample stacktrace:
> {code:java}
> 111018 [p-0000000002-container-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down JobCoordinator from StreamProcessor
> 111043 [SessionTracker] INFO org.apache.zookeeper.server.SessionTrackerImpl - SessionTrackerImpl exited loop!
> 111703 [Thread-43-SendThread(127.0.0.1:64983)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:64983. Will not attempt to authenticate using SASL (unknown error)
> 111704 [Thread-43-SendThread(127.0.0.1:64983)] WARN org.apache.zookeeper.ClientCnxn - Session 0x15e05975d900005 for server null, unexpected error, closing socket connection and attempting reconnect
> java.net.ConnectException: Connection refused
> 	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> 	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> 	at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> 	at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
> 111735 [Thread-16-SendThread(127.0.0.1:64594)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:64594. Will not attempt to authenticate using SASL (unknown error)
> 111839 [p-0000000002-container-thread-0] INFO org.apache.zookeeper.ZooKeeper - Session: 0x15e059731f90005 closed
> 111839 [Thread-16-EventThread] INFO org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 0x15e059731f90005
> 111839 [p-0000000002-container-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down the executor service.
> 111840 [CONTAINER-SHUTDOWN-HOOK] INFO org.apache.samza.container.SamzaContainer - Shutdown complete
> 111872 [Thread-43-SendThread(127.0.0.1:64983)] INFO org.apache.zookeeper.ClientCnxn - Opening socket connection to server 127.0.0.1/127.0.0.1:64983. Will not attempt to authenticate using SASL (unknown error)
> 111872 [Thread-43-SendThread(127.0.0.1:64983)] WARN org.apache.zookeeper.ClientCnxn - Session 0x15e05975d900004 for server null, unexpected error, closing socket connection and attempting reconnect
> java.net.ConnectException: Connection refused
> 	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> 	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> 	at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
> 	at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
> 111941 [debounce-thread-0] ERROR org.apache.samza.zk.ScheduleAfterDebounceTime - OnProcessorChange threw an exception.
> org.apache.samza.SamzaException: Cannot read ZK node: /app-test-app-name-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-test-app-id-a6b3542d-b6ce-4517-ad6f-d25070da4ce1/test-app-name-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-test-app-id-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-coordinationData/processors/0000000000
> 	at org.apache.samza.zk.ZkUtils.readProcessorData(ZkUtils.java:230)
> 	at org.apache.samza.zk.ZkUtils.getActiveProcessorsIDs(ZkUtils.java:253)
> 	at org.apache.samza.zk.ZkJobCoordinator.getActualProcessorIds(ZkJobCoordinator.java:256)
> 	at org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange(ZkJobCoordinator.java:167)
> 	at org.apache.samza.zk.ZkJobCoordinator.lambda$onProcessorChange$1(ZkJobCoordinator.java:161)
> 	at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$scheduleAfterDebounceTime$0(ScheduleAfterDebounceTime.java:95)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: ZkClient already closed!
> 	at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
> 	at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1099)
> 	at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1094)
> 	at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1083)
> 	at org.apache.samza.zk.ZkUtils.readProcessorData(ZkUtils.java:224)
> 	... 12 more
> 111942 [debounce-thread-0] ERROR org.apache.samza.zk.ZkJobCoordinator - Received exception from in JobCoordinator Processing!
> org.apache.samza.SamzaException: Cannot read ZK node: /app-test-app-name-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-test-app-id-a6b3542d-b6ce-4517-ad6f-d25070da4ce1/test-app-name-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-test-app-id-a6b3542d-b6ce-4517-ad6f-d25070da4ce1-coordinationData/processors/0000000000
> 	at org.apache.samza.zk.ZkUtils.readProcessorData(ZkUtils.java:230)
> 	at org.apache.samza.zk.ZkUtils.getActiveProcessorsIDs(ZkUtils.java:253)
> 	at org.apache.samza.zk.ZkJobCoordinator.getActualProcessorIds(ZkJobCoordinator.java:256)
> 	at org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange(ZkJobCoordinator.java:167)
> 	at org.apache.samza.zk.ZkJobCoordinator.lambda$onProcessorChange$1(ZkJobCoordinator.java:161)
> 	at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$scheduleAfterDebounceTime$0(ScheduleAfterDebounceTime.java:95)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 	at java.util.concurrent.SchedeadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> 	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: ZkClient already closed!
> 	at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:987)
> 	at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1099)
> 	at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1094)
> 	at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1083)
> 	at org.apache.samza.zk.ZkUtils.readProcessorData(ZkUtils.java:224)
> 	... 12 more
> 111944 [debounce-thread-0] INFO org.apache.samza.processor.StreamProcessor - Shutting down the executor service.
> 111962 [p-0000000000-container-thread-0] INFO org.apache.zookeeper.ZooKeeper - Session: 0x15e059709020003 closed
> {code}
>  
>  
> Reason being, we call `executorService.shutdownNow();` from StreamProcessor, which just sets the interrupt flag to true for running threads in task queue.
>  
> We don't check if the thread is interrupted or not when starting to execute or while executing in the task implementation. 
> User thread is shutting down the stream processor and at the same time due to an external event, the executorService task is bringing the stream processor back up(and the task is not killed by executorService.shutdown depending upon timing of events). 
> Actual implementation detail where this could happen is `debounceTimer.scheduleAfterDebounceTime(barrierAction, 0, () -> onNewJobModelConfirmed(version))` and LocalApplicationRunner.kill(streamApplication) triggered at the same time. 
> In worst case this will lead to orphaned containers problem, since the job coordinator associated with stream application is stopped and the stream processor is still running(provided that there're multiple stream processors in a JVM).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)