You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Seweryn Habdank-Wojewodzki (JIRA)" <ji...@apache.org> on 2017/07/03 13:15:00 UTC

[jira] [Resolved] (KAFKA-5530) Balancer is dancing with KStream all the time, and due to that Kafka cannot work :-)

     [ https://issues.apache.org/jira/browse/KAFKA-5530?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Seweryn Habdank-Wojewodzki resolved KAFKA-5530.
-----------------------------------------------
    Resolution: Not A Bug

The main problem, at least what we had observed at the end, was that our was simply *_too_* small.

Currently we set: max.poll.interval.ms=1000000 and Kafka Stream (consuming one) is starting properly.

Perhaps it would be good to have some hint in documentation, that max.poll.interval.ms should not be too small, as it will cause endless rebalancing. 

The implicit explanation is here:
If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. 

But explicitely it is not stated, that max.poll.interval.ms shall be somewhat big :-).

> Balancer is dancing with KStream all the time, and due to that Kafka cannot work :-)
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5530
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5530
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.10.2.0, 0.10.2.1, 0.11.0.0
>         Environment: Linux, Windows
>            Reporter: Seweryn Habdank-Wojewodzki
>         Attachments: streamer_20-topics_1-thread-K-0.11.0.0.log.zip, streamer_20-topics_4-threads-K-0.11.0.0.log.zip, streamer_2-topics_1-thread-K-0.11.0.0.log.zip, streamer_2-topics_4_threads-K-0.11.0.0.log.zip
>
>
> Dears,
> There are problems with balancer in KStreams (v. 0.10.2.x), when _num.stream.threads_ is bigger than 1 and the number of the input topics are bigger than 1.
> I am doing more less such a setup in the code:
> {code:java}
> // loop over the inTopicName(s) {
> KStream<String, String> stringInput = kBuilder.stream( STRING_SERDE, STRING_SERDE, inTopicName );
> stringInput.filter( streamFilter::passOrFilterMessages ).map( ndmNormalizer ).to( outTopicName );
> // } end of loop
> streams = new KafkaStreams( kBuilder, streamsConfig );
> streams.cleanUp();
> streams.start();
> {code}
> And if there are *_num.stream.threads=4_* but there are 2 or more but less than num.stream.threads inTopicNames, then complete application startup is totally self-blocked, by writing endless starnge things in log and not starting.
> Even more problematic is when the nuber of topics is higher than _num.stream.threads_ what I had commented in *KAFKA-5167 streams task gets stuck after re-balance due to LockException*.
> I am attaching logs for two scenarios:
>  * when: 1 < num.stream.threads < numer of topics (KAFKA-5167)
>  * when: 1 < numer of topics < num.stream.threads (this ticket).
> I can fully reproduce the behaviour. Even I found workaround for it, but is not desired. When _num.stream.threads=1_ than all works fine :-( (for K v. 0.10.2.x, v. 0.11.0.0 does not work at all).
> {code:bash}
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:466 - stream-thread [StreamThread-3] Assigned tasks to clients as {de0ead97-89d8-49b0-be84-876ca5b41cd8=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 2.0 cost: 0.0]}.
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group stream with generation 2701
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group stream with generation 2701
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-3] New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-1] New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-1] partitions [[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-1] Updating suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-1] Removing all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-1] Removing all standby tasks [[]]
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:290 - stream-thread [StreamThread-1] Constructed client metadata {de0ead97-89d8-49b0-be84-876ca5b41cd8=ClientMetadata{hostInfo=null, consumers=[stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-1-consumer-ab798efe-16a6-4686-bdee-ccd50c937cd7], state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} from the member subscriptions.
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:630 - stream-thread [StreamThread-1] Completed validating internal topics in partition assignor
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:630 - stream-thread [StreamThread-1] Completed validating internal topics in partition assignor
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:466 - stream-thread [StreamThread-1] Assigned tasks to clients as {de0ead97-89d8-49b0-be84-876ca5b41cd8=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}.
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group stream with generation 2702
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-1] New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-3] partitions [[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-3] Updating suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-3] Removing all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-3] Removing all standby tasks [[]]
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-2] partitions [[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-4] partitions [[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-2] Updating suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-4] Updating suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-2] Removing all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-4] Removing all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-4] Removing all standby tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-2] Removing all standby tasks [[]]
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:290 - stream-thread [StreamThread-2] Constructed client metadata {de0ead97-89d8-49b0-be84-876ca5b41cd8=ClientMetadata{hostInfo=null, consumers=[stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-3-consumer-16274860-9a0f-4df9-8af3-10f4c3c23d50, stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-4-consumer-be7bc520-7174-4d6e-9258-9761b6c45bd9, stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-2-consumer-401f1542-c311-4b1f-8f4e-72d6ade12583], state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 3.0 cost: 0.0]}} from the member subscriptions.
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:630 - stream-thread [StreamThread-2] Completed validating internal topics in partition assignor
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:630 - stream-thread [StreamThread-2] Completed validating internal topics in partition assignor
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:466 - stream-thread [StreamThread-2] Assigned tasks to clients as {de0ead97-89d8-49b0-be84-876ca5b41cd8=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 3.0 cost: 0.0]}.
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group stream with generation 2703
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group stream with generation 2703
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group stream with generation 2703
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-4] New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-2] New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-3] New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-1] partitions [[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-1] Updating suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-1] Removing all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-1] Removing all standby tasks [[]]
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:290 - stream-thread [StreamThread-1] Constructed client metadata {de0ead97-89d8-49b0-be84-876ca5b41cd8=ClientMetadata{hostInfo=null, consumers=[stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-1-consumer-b35886f7-0525-458b-9b3e-8856554d0afb], state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}} from the member subscriptions.
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:630 - stream-thread [StreamThread-1] Completed validating internal topics in partition assignor
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:630 - stream-thread [StreamThread-1] Completed validating internal topics in partition assignor
> 2017-06-27 19:45:00 INFO StreamPartitionAssignor:466 - stream-thread [StreamThread-1] Assigned tasks to clients as {de0ead97-89d8-49b0-be84-876ca5b41cd8=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}.
> 2017-06-27 19:45:00 INFO AbstractCoordinator:375 - Successfully joined group stream with generation 2704
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:252 - Setting newly assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:228 - stream-thread [StreamThread-1] New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-2] partitions [[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-4] partitions [[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-2] Updating suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-4] Updating suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-2] Removing all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-4] Removing all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-2] Removing all standby tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-4] Removing all standby tasks [[]]
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:00 INFO ConsumerCoordinator:393 - Revoking previously assigned partitions [] for group stream
> 2017-06-27 19:45:00 INFO StreamThread:254 - stream-thread [StreamThread-3] partitions [[]] revoked at the beginning of consumer rebalance.
> 2017-06-27 19:45:00 INFO StreamThread:1012 - stream-thread [StreamThread-3] Updating suspended tasks to contain active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1019 - stream-thread [StreamThread-3] Removing all active tasks [[]]
> 2017-06-27 19:45:00 INFO StreamThread:1034 - stream-thread [StreamThread-3] Removing all standby tasks [[]]
> 2017-06-27 19:45:00 INFO AbstractCoordinator:407 - (Re-)joining group stream
> 2017-06-27 19:45:01 INFO StreamPartitionAssignor:290 - stream-thread [StreamThread-2] Constructed client metadata {de0ead97-89d8-49b0-be84-876ca5b41cd8=ClientMetadata{hostInfo=null, consumers=[stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-3-consumer-142cd5c5-a52d-494a-a8be-ee1f9ae831e2, stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-2-consumer-f8a93346-c322-4e9e-ab38-c9a9eb4a9fa4, stream-de0ead97-89d8-49b0-be84-876ca5b41cd8-StreamThread-4-consumer-0726705d-c88f-4ad2-81c0-9ab02175b53e], state=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 3.0 cost: 0.0]}} from the member subscriptions.
> 2017-06-27 19:45:01 INFO StreamPartitionAssignor:630 - stream-thread [StreamThread-2] Completed validating internal topics in partition assignor
> 2017-06-27 19:45:01 INFO StreamPartitionAssignor:630 - stream-thread [StreamThread-2] Completed validating internal topics in partition assignor
> 2017-06-27 19:45:01 INFO StreamPartitionAssignor:466 - stream-thread [StreamThread-2] Assigned tasks to clients as {de0ead97-89d8-49b0-be84-876ca5b41cd8=[activeTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 3.0 cost: 0.0]}.
> 2017-06-27 19:45:01 INFO AbstractCoordinator:375 - Successfully joined group stream with generation 2705
> 2017-06-27 19:45:01 INFO AbstractCoordinator:375 - Successfully joined group stream with generation 2705
> 2017-06-27 19:45:01 INFO AbstractCoordinator:375 - Successfully joined group stream with generation 2705
> 2017-06-27 19:45:01 INFO ConsumerCoordinator:252 - Setting newly assigned partitions [] for group stream
> 2017-06-27 19:45:01 INFO ConsumerCoordinator:252 - Setting newly assigned partitions [] for group stream
> 2017-06-27 19:45:01 INFO StreamThread:228 - stream-thread [StreamThread-4] New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:01 INFO ConsumerCoordinator:252 - Setting newly assigned partitions [] for group stream
> 2017-06-27 19:45:01 INFO StreamThread:228 - stream-thread [StreamThread-3] New partitions [[]] assigned at the end of consumer rebalance.
> 2017-06-27 19:45:01 INFO StreamThread:228 - stream-thread [StreamThread-2] New partitions [[]] assigned at the end of consumer rebalance. 
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)