You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Piotr Strąk <pi...@relayr.io> on 2019/07/11 12:52:58 UTC

Kafka Streams not fetching data from one partition

Hello,

I'm investigating an issue in which a Kafka Streams application does not 
consume from one of the partitions it was assigned. I'm using the 2.3.0 
version.

All the fetch requests are sent for two partitions only:

 > Using older server API v6 to send FETCH 
{replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,topics=[{topic=Topic_v2,partitions=[{partition=25,fetch_offset=71484113,log_start_offset=-1,partition_max_bytes=1048576},{partition=22,fetch_offset=71296374,log_start_offset=-1,partition_max_bytes=1048576}]}]} 
with correlation id 1049090 to node 1

But there should be three of them; using kafka-consumer-groups.sh I can 
see that the third one (partition 23) that has no current offset.

 > Topic_v2    22         71296374        71296374 0 
Topic-c1f03-StreamThread-1-consumer
 > Topic_v2    23         -                      72830772 -          
Topic-c1f03-StreamThread-1-consumer
 > Topic_v2    25         71484113        71484113 0 
Topic-c1f03-StreamThread-1-consumer

It worked fine until the partition was revoked, but absolutely nothing 
happened afterwards. Notice that the (shortened by me) clientId has 
changed.

 > 2019-07-10 02:15:36.347 [INFO] 
ConsumerCoordinator:Topic-c1f03-StreamThread-1 - [Consumer 
clientId=Topic-c1f03-StreamThread-1-consumer, groupId=Topic] Setting 
offset for partition Topic_v2-23 to the committed offset 
FetchPosition{offset=68735327, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=ip-10-0-1-81.us-west-2.compute.internal:9092 
(id: 3 rack: null), epoch=-1}}
 > 2019-07-10 02:15:36.383 [INFO] 
ConsumerCoordinator:Topic-3e717-StreamThread-1 - [Consumer 
clientId=Topic-3e717-StreamThread-1-consumer, groupId=Topic] Revoking 
previously assigned partitions [Topic_v2-23]
 > 2019-07-10 02:15:36.498 [INFO] 
ConsumerCoordinator:Topic-3e717-StreamThread-1 - [Consumer 
clientId=Topic-3e717-StreamThread-1-consumer, groupId=Topic] Setting 
newly assigned partitions: Topic_v2-23
 > 2019-07-10 02:15:36.506 [INFO] 
ConsumerCoordinator:Topic-3e717-StreamThread-1 - [Consumer 
clientId=Topic-3e717-StreamThread-1-consumer, groupId=Topic] Setting 
offset for partition Topic_v2-23 to the committed offset 
FetchPosition{offset=70030508, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=ip-10-0-1-81.us-west-2.compute.internal:9092 
(id: 3 rack: null), epoch=-1}}

What could be a reason that this partition is not included in the fetch 
request? It uses a single thread to process it (num.stream.threads set 
to 1), so if the thread was locked it couldn't keep working on two other 
partitions, if I understand correctly.


Re: Kafka Streams not fetching data from one partition

Posted by Piotr Strąk <pi...@relayr.io>.
Hi Bill,

Unfortunately logs from the broker are gone, and the logging level of 
Kafka Streams application was set to info, so there isn't much, but I've 
attached them. The relevant consumer contains the "aggregate-level3600" 
name in it, and it's the 23rd partition that was not being consumed 
after the last rebalance.

Are there are known known scenarios for this to happen? For example, my 
application doing something wrong within the Kafka Streams so that one 
partition remains unprocessed?

On 11/07/2019 22:18, Bill Bejeck wrote:
> Hi Piotr,
>
> Thanks for reporting this issue.  Can you provide full kafka-streams and
> broker logs around the timeframe you observed this?
>
> -Bill
>
> On Thu, Jul 11, 2019 at 8:53 AM Piotr Strąk <pi...@relayr.io> wrote:
>
>> Hello,
>>
>> I'm investigating an issue in which a Kafka Streams application does not
>> consume from one of the partitions it was assigned. I'm using the 2.3.0
>> version.
>>
>> All the fetch requests are sent for two partitions only:
>>
>>   > Using older server API v6 to send FETCH
>> {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,topics=[{topic=Topic_v2,partitions=[{partition=25,fetch_offset=71484113,log_start_offset=-1,partition_max_bytes=1048576},{partition=22,fetch_offset=71296374,log_start_offset=-1,partition_max_bytes=1048576}]}]}
>>
>> with correlation id 1049090 to node 1
>>
>> But there should be three of them; using kafka-consumer-groups.sh I can
>> see that the third one (partition 23) that has no current offset.
>>
>>   > Topic_v2    22         71296374        71296374 0
>> Topic-c1f03-StreamThread-1-consumer
>>   > Topic_v2    23         -                      72830772 -
>> Topic-c1f03-StreamThread-1-consumer
>>   > Topic_v2    25         71484113        71484113 0
>> Topic-c1f03-StreamThread-1-consumer
>>
>> It worked fine until the partition was revoked, but absolutely nothing
>> happened afterwards. Notice that the (shortened by me) clientId has
>> changed.
>>
>>   > 2019-07-10 02:15:36.347 [INFO]
>> ConsumerCoordinator:Topic-c1f03-StreamThread-1 - [Consumer
>> clientId=Topic-c1f03-StreamThread-1-consumer, groupId=Topic] Setting
>> offset for partition Topic_v2-23 to the committed offset
>> FetchPosition{offset=68735327, offsetEpoch=Optional.empty,
>> currentLeader=LeaderAndEpoch{leader=ip-10-0-1-81.us-west-2.compute.internal:9092
>>
>> (id: 3 rack: null), epoch=-1}}
>>   > 2019-07-10 02:15:36.383 [INFO]
>> ConsumerCoordinator:Topic-3e717-StreamThread-1 - [Consumer
>> clientId=Topic-3e717-StreamThread-1-consumer, groupId=Topic] Revoking
>> previously assigned partitions [Topic_v2-23]
>>   > 2019-07-10 02:15:36.498 [INFO]
>> ConsumerCoordinator:Topic-3e717-StreamThread-1 - [Consumer
>> clientId=Topic-3e717-StreamThread-1-consumer, groupId=Topic] Setting
>> newly assigned partitions: Topic_v2-23
>>   > 2019-07-10 02:15:36.506 [INFO]
>> ConsumerCoordinator:Topic-3e717-StreamThread-1 - [Consumer
>> clientId=Topic-3e717-StreamThread-1-consumer, groupId=Topic] Setting
>> offset for partition Topic_v2-23 to the committed offset
>> FetchPosition{offset=70030508, offsetEpoch=Optional.empty,
>> currentLeader=LeaderAndEpoch{leader=ip-10-0-1-81.us-west-2.compute.internal:9092
>>
>> (id: 3 rack: null), epoch=-1}}
>>
>> What could be a reason that this partition is not included in the fetch
>> request? It uses a single thread to process it (num.stream.threads set
>> to 1), so if the thread was locked it couldn't keep working on two other
>> partitions, if I understand correctly.
>>
>>

Re: Kafka Streams not fetching data from one partition

Posted by Bill Bejeck <bi...@confluent.io>.
Hi Piotr,

Thanks for reporting this issue.  Can you provide full kafka-streams and
broker logs around the timeframe you observed this?

-Bill

On Thu, Jul 11, 2019 at 8:53 AM Piotr Strąk <pi...@relayr.io> wrote:

> Hello,
>
> I'm investigating an issue in which a Kafka Streams application does not
> consume from one of the partitions it was assigned. I'm using the 2.3.0
> version.
>
> All the fetch requests are sent for two partitions only:
>
>  > Using older server API v6 to send FETCH
> {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,topics=[{topic=Topic_v2,partitions=[{partition=25,fetch_offset=71484113,log_start_offset=-1,partition_max_bytes=1048576},{partition=22,fetch_offset=71296374,log_start_offset=-1,partition_max_bytes=1048576}]}]}
>
> with correlation id 1049090 to node 1
>
> But there should be three of them; using kafka-consumer-groups.sh I can
> see that the third one (partition 23) that has no current offset.
>
>  > Topic_v2    22         71296374        71296374 0
> Topic-c1f03-StreamThread-1-consumer
>  > Topic_v2    23         -                      72830772 -
> Topic-c1f03-StreamThread-1-consumer
>  > Topic_v2    25         71484113        71484113 0
> Topic-c1f03-StreamThread-1-consumer
>
> It worked fine until the partition was revoked, but absolutely nothing
> happened afterwards. Notice that the (shortened by me) clientId has
> changed.
>
>  > 2019-07-10 02:15:36.347 [INFO]
> ConsumerCoordinator:Topic-c1f03-StreamThread-1 - [Consumer
> clientId=Topic-c1f03-StreamThread-1-consumer, groupId=Topic] Setting
> offset for partition Topic_v2-23 to the committed offset
> FetchPosition{offset=68735327, offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=ip-10-0-1-81.us-west-2.compute.internal:9092
>
> (id: 3 rack: null), epoch=-1}}
>  > 2019-07-10 02:15:36.383 [INFO]
> ConsumerCoordinator:Topic-3e717-StreamThread-1 - [Consumer
> clientId=Topic-3e717-StreamThread-1-consumer, groupId=Topic] Revoking
> previously assigned partitions [Topic_v2-23]
>  > 2019-07-10 02:15:36.498 [INFO]
> ConsumerCoordinator:Topic-3e717-StreamThread-1 - [Consumer
> clientId=Topic-3e717-StreamThread-1-consumer, groupId=Topic] Setting
> newly assigned partitions: Topic_v2-23
>  > 2019-07-10 02:15:36.506 [INFO]
> ConsumerCoordinator:Topic-3e717-StreamThread-1 - [Consumer
> clientId=Topic-3e717-StreamThread-1-consumer, groupId=Topic] Setting
> offset for partition Topic_v2-23 to the committed offset
> FetchPosition{offset=70030508, offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=ip-10-0-1-81.us-west-2.compute.internal:9092
>
> (id: 3 rack: null), epoch=-1}}
>
> What could be a reason that this partition is not included in the fetch
> request? It uses a single thread to process it (num.stream.threads set
> to 1), so if the thread was locked it couldn't keep working on two other
> partitions, if I understand correctly.
>
>