You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Prateek Maheshwari (JIRA)" <ji...@apache.org> on 2019/03/19 18:28:00 UTC

[jira] [Commented] (SAMZA-1663) StreamProcessor fails after reestablishing zookeeper session.

    [ https://issues.apache.org/jira/browse/SAMZA-1663?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16796388#comment-16796388 ] 

Prateek Maheshwari commented on SAMZA-1663:
-------------------------------------------

[~spvenkat] Is this still relevant?

> StreamProcessor fails after reestablishing zookeeper session.
> -------------------------------------------------------------
>
>                 Key: SAMZA-1663
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1663
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Shanthoosh Venkataraman
>            Assignee: Shanthoosh Venkataraman
>            Priority: Major
>
>  
> *Problem:*
> StreamProcessor fails sporadically after reestablishing the zookeeper session.
> *Analysis:*
> Currently in ZkJobCoordinator, buffered stale events in the leader stream processor before a zookeeper session disconnect causes the StreamProcessor to fail after session reconnect. Consider the following scenario: 
> A. Leader receives processor change notification from zookeeper.
> B. Before it acts on the notification, zookeeper session expiration occurs.
> C. Leader stream processor disconnects from the group and joins the group as a follower.
> D. It acts upon the stale events which were buffered in its worker queue((generated for some non-existent processors)), which ends up killing it.
> *Relevant stacktrace:*
> As we can see in this case, before the session expiration there are two processorIds viz [000000001, 000000002]. After the session expiration, alive processorIds were [000000003, 000000004]. StreamProcessor failed after zookeeper session reconnect since it tried to generate JobModel for non-existent processors([000000001, 000000002]).
> {code:java}
> [ZkClient-EventThread-291-127.0.0.1:53447] INFO org.apache.samza.zk.ZkJobCoordinator - Got new session created event for processor=0000000001
> [ZkClient-EventThread-291-127.0.0.1:53447] INFO org.apache.samza.zk.ZkJobCoordinator - register zk controller for the new session
> [ZkClient-EventThread-291-127.0.0.1:53447] INFO org.apache.samza.zk.ZkUtils - Created ephemeral path: /app-test-app-name-e4bb9daa-d00d-452c-bf7d-e9625c8f8193-test-app-id-e4bb9daa-d00d-452c-bf7d-e9625c8f8193/test-app-name-e4bb9daa-d00d-452c-bf7d-e9625c8f8193-test-app-id-e4bb9daa-d00d-452c-bf7d-e9625c8f8193-coordinationData/processors/0000000004 for processor: svenkata-mn1.linkedin.biz 0000000001 in zookeeper.
> [ZkClient-EventThread-291-127.0.0.1:53447] INFO org.apache.samza.zk.ZkUtils - Found these children - [0000000003, 0000000004]
> [ZkClient-EventThread-291-127.0.0.1:53447] INFO org.apache.samza.zk.ZkLeaderElector - tryBecomeLeader: index = 1 for path=/app-test-app-name-e4bb9daa-d00d-452c-bf7d-e9625c8f8193-test-app-id-e4bb9daa-d00d-452c-bf7d-e9625c8f8193/test-app-name-e4bb9daa-d00d-452 c-bf7d-e9625c8f8193-test-app-id-e4bb9daa-d00d-452c-bf7d-e9625c8f8193-coordinationData/processors/0000000004 out of [0000000003, 0000000004]
> [ZkClient-EventThread-291-127.0.0.1:53447] INFO org.apache.samza.zk.ZkLeaderElector - Index = 1 Not eligible to be a leader yet!
> [ZkClient-EventThread-291-127.0.0.1:53447] INFO org.apache.samza.zk.ZkLeaderElector - [Processor-0000000001] Subscribing data change for 0000000003
> [ZkClient-EventThread-291-127.0.0.1:53447] INFO org.apache.samza.zk.ZkLeaderElector - [Processor-0000000001] Predecessor still exists. Current subscription is valid. Continuing as non-leader.
> [ZkClient-EventThread-291-127.0.0.1:53447] INFO org.apache.samza.zk.ZkUtils - Current version for zk root node: /app-test-app-name-e4bb9daa-d00d-452c-bf7d-e9625c8f8193-test-app-id-e4bb9daa-d00d-452c-bf7d-e9625c8f8193/test-app-name-e4bb9daa-d00d-452c-bf7d-e96 25c8f8193-test-app-id-e4bb9daa-d00d-452c-bf7d-e9625c8f8193-coordinationData is 1.0, expected version is 1.0
> ....
> ....
> ....
> [ZkClient-EventThread-291-127.0.0.1:53447] INFO org.apache.samza.zk.ZkJobCoordinator - Got syncconnected event for processor=0000000001. [ZkClient-EventThread-291-127.0.0.1:53447] INFO org.apache.samza.zk.ScheduleAfterDebounceTime - Attempting to cancel the future of action: ZK_SESSION_ERROR
> 15:16:50.417 [DEBUG] [TestEventLogger] 540098 [debounce-thread-0] INFO org.apache.samza.zk.ZkUtils - read the model ver=2 from /app-test-app-name-e4bb9daa-d00d-452c-bf7d-e9625c8f8193-test-app-id-e4bb9daa-d00d-452c-bf7d-e9625c8f8193/test-app-name-e4bb9daa-d00d-452c-bf7d-e9625c8f8193-test-app-id-e4bb9daa- d00d-452c-bf7d-e9625c8f8193-coordinationData/JobModelGeneration/jobModels/2
> 15:16:50.415 [DEBUG] [TestEventLogger] 540096 [debounce-thread-0] INFO org.apache.samza.zk.ZkUtils - Found these children - [0000000001, 0000000002] 
> 15:16:50.416 [DEBUG] [TestEventLogger] 540096 [debounce-thread-0] INFO org.apache.samza.zk.ZkUtils - Found these processorIds 
> [debounce-thread-0] INFO org.apache.samza.zk.ZkJobCoordinator - Generating new JobModel with processors: [].
> [debounce-thread-0] INFO org.apache.samza.zk.ZkUtils - read the model ver=2 from /app-test-app-name-e4bb9daa-d00d-452c-bf7d-e9625c8f8193-test-app-id-e4bb9daa-d00d-452c-bf7d-e9625c8f8193/test-app-name-e4bb9daa-d00d-452c-bf7d-e9625c8f8193-test-app-id-e4bb9daa-d00d-452c-bf7d-e9625c8f8193-coordinationData/JobModelGeneration/jobModels/2
> [debounce-thread-0] INFO org.apache.samza.coordinator.JobModelManager$ - SystemStreamPartitionGrouper org.apache.samza.container.grouper.stream.GroupByPartition@33169a55 has grouped the SystemStreamPartitions into 5 tasks with the following taskNames: [Partition 1, Partition 0, Partition 3, Partition 2, Partition 4]
> [debounce-thread-0] ERROR org.apache.samza.zk.ScheduleAfterDebounceTime - Execution of action: OnProcessorChange failed.
> java.lang.IllegalArgumentException: Must have at least one container
>      at org.apache.samza.container.grouper.task.GroupByContainerIds.group(GroupByContainerIds.java:66)
>      at org.apache.samza.coordinator.JobModelManager$.readJobModel(JobModelManager.scala:208)
>      at org.apache.samza.coordinator.JobModelManager.readJobModel(JobModelManager.scala)
>      at org.apache.samza.zk.ZkJobCoordinator.generateNewJobModel(ZkJobCoordinator.java:354)
>      at org.apache.samza.zk.ZkJobCoordinator.doOnProcessorChange(ZkJobCoordinator.java:241)
>      at org.apache.samza.zk.ZkJobCoordinator.lambda$onProcessorChange$1(ZkJobCoordinator.java:225)
>      at org.apache.samza.zk.ScheduleAfterDebounceTime.lambda$getScheduleableAction$0(ScheduleAfterDebounceTime.java:153)
>      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>      at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180){code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)