You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Patrik Kleindl <pk...@gmail.com> on 2018/11/06 10:03:15 UTC

Offsets/Lags for global state stores not shown

Hello

Am I doing something wrong or is it by design that global state stores and
their consumers do not show up under the consumer-groups?
With the consumer group command (and in control center as well) I don't get
any output for the group:
./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
--describe
Note: This will not show information about old Zookeeper-based consumers.

If I query for the state I get a response that members are present:
./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
--describe --state
Note: This will not show information about old Zookeeper-based consumers.

COORDINATOR (ID)                 ASSIGNMENT-STRATEGY
STATE                #MEMBERS
broker:9092 (1) stream                    Stable               2

This is quite irritating as we cannot see if a global state store has
caught up with a backlog of messages.

Code to reproduce:
        builder.globalTable(TOPIC_NAME, Materialized
                .<String, String, KeyValueStore<Bytes, byte[]>>as(STORENAME)
                .withKeySerde(Serdes.String())
                .withValueSerde(Serdes.String()));

Nothing fancy.

Logs:
2018-11-05 21:25:56 INFO  AbstractCoordinator:442 - (Re-)joining group
2018-11-05 21:25:56 INFO  StreamPartitionAssignor:481 - Assigned tasks to
clients as {e0250aa5-e1c6-4d33-a746-bc9357c66965=[activeTasks: ([])
standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([])
prevAssignedTasks: ([]) capacity: 1]}.
2018-11-05 21:25:56 WARN  ConsumerCoordinator:376 - The following
subscribed topics are not assigned to any members: [storetopic]
2018-11-05 21:25:56 INFO  AbstractCoordinator:409 - Successfully joined
group with generation 3
2018-11-05 21:25:56 INFO  ConsumerCoordinator:256 - Setting newly assigned
partitions []

The store works after this, but it is not shown.

Any input is appreciated

best regards

Patrik

PS: The customer will forward this to the Confluent support too, but I'm
asking here for public visibility

Re: Offsets/Lags for global state stores not shown

Posted by Patrik Kleindl <pk...@gmail.com>.
Thanks for the reply.
It would be interesting who else is using IQ with or without GlobalKTables and what problems and solutions they have come up with.
Best regards
Patrik

> Am 18.11.2018 um 20:21 schrieb Matthias J. Sax <ma...@confluent.io>:
> 
> Because each instance needs to consume all data, it's limited by what a
> single instance can consume -- a hard bound is the network. Note,
> network is shared, so don't take the maximum network speed into account.
> Also, it's not the number of unique messaged, but the number of updates
> that is important for this.
> 
> 
>> Just to verify, for this IQ setup (streams app which only builds a single
>> table to be queried) we have tried the alternative approach to use a normal
>> KTable in combination with a unique application ID per application instance.
>> This seemed to work quite well, including faster (parallel) startup etc.
>> Is this approach valid or would you expect some pitfalls?
> 
> 
> I guess, for your use case, this might be ok. There is one difference on
> startup: if there is no local state build up, in the GlobalKTable case,
> before you can start querying, the GlobalKTable will be fully populated
> from the topic. For the KTable case, you can query from the very
> beginning on, while data is put into the table.
> 
> Also, for this approach, if you add other processing, this processing
> would not be parallelized but duplicated.
> 
> 
> -Matthias
> 
> 
> 
>> On 11/7/18 1:32 AM, Patrik Kleindl wrote:
>> Thanks for the response.
>> How "low" is the expected low throughput? We are are using GlobalKTables
>> for IQ on several Topics, but with single-digit million unique messages and
>> usually fewer changes per day.
>> 
>> Just to verify, for this IQ setup (streams app which only builds a single
>> table to be queried) we have tried the alternative approach to use a normal
>> KTable in combination with a unique application ID per application instance.
>> This seemed to work quite well, including faster (parallel) startup etc.
>> Is this approach valid or would you expect some pitfalls?
>> 
>> We have not used this approach more because it doesn't not work for global
>> stores inside a streams application, but it might be beneficial to split
>> that up again.
>> 
>> best regards
>> 
>> Patrik
>> 
>>> On Tue, 6 Nov 2018 at 20:07, Matthias J. Sax <ma...@confluent.io> wrote:
>>> 
>>> The topics of global stores are not included by design.
>>> 
>>> The "problem" is, that each instance needs to consume *all*
>>> topic-partitions from and thus topis, we thus they cannot be include
>>> into the consumer group that would assign each partition to exactly one
>>> instance. Hence, an additional consumer is used that uses partition
>>> assignment (instead of subscription) and this consumer does not commit
>>> any offset to Kafka.
>>> 
>>> Note that global stores are bootstrapped before processing begins
>>> though, and are expected to be low throughput topic anyway.
>>> 
>>> 
>>> -Matthias
>>> 
>>>> On 11/6/18 2:03 AM, Patrik Kleindl wrote:
>>>> Hello
>>>> 
>>>> Am I doing something wrong or is it by design that global state stores
>>> and
>>>> their consumers do not show up under the consumer-groups?
>>>> With the consumer group command (and in control center as well) I don't
>>> get
>>>> any output for the group:
>>>> ./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
>>>> --describe
>>>> Note: This will not show information about old Zookeeper-based consumers.
>>>> 
>>>> If I query for the state I get a response that members are present:
>>>> ./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
>>>> --describe --state
>>>> Note: This will not show information about old Zookeeper-based consumers.
>>>> 
>>>> COORDINATOR (ID)                 ASSIGNMENT-STRATEGY
>>>> STATE                #MEMBERS
>>>> broker:9092 (1) stream                    Stable               2
>>>> 
>>>> This is quite irritating as we cannot see if a global state store has
>>>> caught up with a backlog of messages.
>>>> 
>>>> Code to reproduce:
>>>>        builder.globalTable(TOPIC_NAME, Materialized
>>>>                .<String, String, KeyValueStore<Bytes,
>>> byte[]>>as(STORENAME)
>>>>                .withKeySerde(Serdes.String())
>>>>                .withValueSerde(Serdes.String()));
>>>> 
>>>> Nothing fancy.
>>>> 
>>>> Logs:
>>>> 2018-11-05 21:25:56 INFO  AbstractCoordinator:442 - (Re-)joining group
>>>> 2018-11-05 21:25:56 INFO  StreamPartitionAssignor:481 - Assigned tasks to
>>>> clients as {e0250aa5-e1c6-4d33-a746-bc9357c66965=[activeTasks: ([])
>>>> standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([])
>>>> prevAssignedTasks: ([]) capacity: 1]}.
>>>> 2018-11-05 21:25:56 WARN  ConsumerCoordinator:376 - The following
>>>> subscribed topics are not assigned to any members: [storetopic]
>>>> 2018-11-05 21:25:56 INFO  AbstractCoordinator:409 - Successfully joined
>>>> group with generation 3
>>>> 2018-11-05 21:25:56 INFO  ConsumerCoordinator:256 - Setting newly
>>> assigned
>>>> partitions []
>>>> 
>>>> The store works after this, but it is not shown.
>>>> 
>>>> Any input is appreciated
>>>> 
>>>> best regards
>>>> 
>>>> Patrik
>>>> 
>>>> PS: The customer will forward this to the Confluent support too, but I'm
>>>> asking here for public visibility
>>>> 
>>> 
>>> 
>> 
> 

Re: Offsets/Lags for global state stores not shown

Posted by "Matthias J. Sax" <ma...@confluent.io>.
Because each instance needs to consume all data, it's limited by what a
single instance can consume -- a hard bound is the network. Note,
network is shared, so don't take the maximum network speed into account.
Also, it's not the number of unique messaged, but the number of updates
that is important for this.


> Just to verify, for this IQ setup (streams app which only builds a single
> table to be queried) we have tried the alternative approach to use a normal
> KTable in combination with a unique application ID per application instance.
> This seemed to work quite well, including faster (parallel) startup etc.
> Is this approach valid or would you expect some pitfalls?


I guess, for your use case, this might be ok. There is one difference on
startup: if there is no local state build up, in the GlobalKTable case,
before you can start querying, the GlobalKTable will be fully populated
from the topic. For the KTable case, you can query from the very
beginning on, while data is put into the table.

Also, for this approach, if you add other processing, this processing
would not be parallelized but duplicated.


-Matthias



On 11/7/18 1:32 AM, Patrik Kleindl wrote:
> Thanks for the response.
> How "low" is the expected low throughput? We are are using GlobalKTables
> for IQ on several Topics, but with single-digit million unique messages and
> usually fewer changes per day.
> 
> Just to verify, for this IQ setup (streams app which only builds a single
> table to be queried) we have tried the alternative approach to use a normal
> KTable in combination with a unique application ID per application instance.
> This seemed to work quite well, including faster (parallel) startup etc.
> Is this approach valid or would you expect some pitfalls?
> 
> We have not used this approach more because it doesn't not work for global
> stores inside a streams application, but it might be beneficial to split
> that up again.
> 
> best regards
> 
> Patrik
> 
> On Tue, 6 Nov 2018 at 20:07, Matthias J. Sax <ma...@confluent.io> wrote:
> 
>> The topics of global stores are not included by design.
>>
>> The "problem" is, that each instance needs to consume *all*
>> topic-partitions from and thus topis, we thus they cannot be include
>> into the consumer group that would assign each partition to exactly one
>> instance. Hence, an additional consumer is used that uses partition
>> assignment (instead of subscription) and this consumer does not commit
>> any offset to Kafka.
>>
>> Note that global stores are bootstrapped before processing begins
>> though, and are expected to be low throughput topic anyway.
>>
>>
>> -Matthias
>>
>> On 11/6/18 2:03 AM, Patrik Kleindl wrote:
>>> Hello
>>>
>>> Am I doing something wrong or is it by design that global state stores
>> and
>>> their consumers do not show up under the consumer-groups?
>>> With the consumer group command (and in control center as well) I don't
>> get
>>> any output for the group:
>>> ./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
>>> --describe
>>> Note: This will not show information about old Zookeeper-based consumers.
>>>
>>> If I query for the state I get a response that members are present:
>>> ./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
>>> --describe --state
>>> Note: This will not show information about old Zookeeper-based consumers.
>>>
>>> COORDINATOR (ID)                 ASSIGNMENT-STRATEGY
>>> STATE                #MEMBERS
>>> broker:9092 (1) stream                    Stable               2
>>>
>>> This is quite irritating as we cannot see if a global state store has
>>> caught up with a backlog of messages.
>>>
>>> Code to reproduce:
>>>         builder.globalTable(TOPIC_NAME, Materialized
>>>                 .<String, String, KeyValueStore<Bytes,
>> byte[]>>as(STORENAME)
>>>                 .withKeySerde(Serdes.String())
>>>                 .withValueSerde(Serdes.String()));
>>>
>>> Nothing fancy.
>>>
>>> Logs:
>>> 2018-11-05 21:25:56 INFO  AbstractCoordinator:442 - (Re-)joining group
>>> 2018-11-05 21:25:56 INFO  StreamPartitionAssignor:481 - Assigned tasks to
>>> clients as {e0250aa5-e1c6-4d33-a746-bc9357c66965=[activeTasks: ([])
>>> standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([])
>>> prevAssignedTasks: ([]) capacity: 1]}.
>>> 2018-11-05 21:25:56 WARN  ConsumerCoordinator:376 - The following
>>> subscribed topics are not assigned to any members: [storetopic]
>>> 2018-11-05 21:25:56 INFO  AbstractCoordinator:409 - Successfully joined
>>> group with generation 3
>>> 2018-11-05 21:25:56 INFO  ConsumerCoordinator:256 - Setting newly
>> assigned
>>> partitions []
>>>
>>> The store works after this, but it is not shown.
>>>
>>> Any input is appreciated
>>>
>>> best regards
>>>
>>> Patrik
>>>
>>> PS: The customer will forward this to the Confluent support too, but I'm
>>> asking here for public visibility
>>>
>>
>>
> 


Re: Offsets/Lags for global state stores not shown

Posted by Patrik Kleindl <pk...@gmail.com>.
Thanks for the response.
How "low" is the expected low throughput? We are are using GlobalKTables
for IQ on several Topics, but with single-digit million unique messages and
usually fewer changes per day.

Just to verify, for this IQ setup (streams app which only builds a single
table to be queried) we have tried the alternative approach to use a normal
KTable in combination with a unique application ID per application instance.
This seemed to work quite well, including faster (parallel) startup etc.
Is this approach valid or would you expect some pitfalls?

We have not used this approach more because it doesn't not work for global
stores inside a streams application, but it might be beneficial to split
that up again.

best regards

Patrik

On Tue, 6 Nov 2018 at 20:07, Matthias J. Sax <ma...@confluent.io> wrote:

> The topics of global stores are not included by design.
>
> The "problem" is, that each instance needs to consume *all*
> topic-partitions from and thus topis, we thus they cannot be include
> into the consumer group that would assign each partition to exactly one
> instance. Hence, an additional consumer is used that uses partition
> assignment (instead of subscription) and this consumer does not commit
> any offset to Kafka.
>
> Note that global stores are bootstrapped before processing begins
> though, and are expected to be low throughput topic anyway.
>
>
> -Matthias
>
> On 11/6/18 2:03 AM, Patrik Kleindl wrote:
> > Hello
> >
> > Am I doing something wrong or is it by design that global state stores
> and
> > their consumers do not show up under the consumer-groups?
> > With the consumer group command (and in control center as well) I don't
> get
> > any output for the group:
> > ./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
> > --describe
> > Note: This will not show information about old Zookeeper-based consumers.
> >
> > If I query for the state I get a response that members are present:
> > ./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
> > --describe --state
> > Note: This will not show information about old Zookeeper-based consumers.
> >
> > COORDINATOR (ID)                 ASSIGNMENT-STRATEGY
> > STATE                #MEMBERS
> > broker:9092 (1) stream                    Stable               2
> >
> > This is quite irritating as we cannot see if a global state store has
> > caught up with a backlog of messages.
> >
> > Code to reproduce:
> >         builder.globalTable(TOPIC_NAME, Materialized
> >                 .<String, String, KeyValueStore<Bytes,
> byte[]>>as(STORENAME)
> >                 .withKeySerde(Serdes.String())
> >                 .withValueSerde(Serdes.String()));
> >
> > Nothing fancy.
> >
> > Logs:
> > 2018-11-05 21:25:56 INFO  AbstractCoordinator:442 - (Re-)joining group
> > 2018-11-05 21:25:56 INFO  StreamPartitionAssignor:481 - Assigned tasks to
> > clients as {e0250aa5-e1c6-4d33-a746-bc9357c66965=[activeTasks: ([])
> > standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([])
> > prevAssignedTasks: ([]) capacity: 1]}.
> > 2018-11-05 21:25:56 WARN  ConsumerCoordinator:376 - The following
> > subscribed topics are not assigned to any members: [storetopic]
> > 2018-11-05 21:25:56 INFO  AbstractCoordinator:409 - Successfully joined
> > group with generation 3
> > 2018-11-05 21:25:56 INFO  ConsumerCoordinator:256 - Setting newly
> assigned
> > partitions []
> >
> > The store works after this, but it is not shown.
> >
> > Any input is appreciated
> >
> > best regards
> >
> > Patrik
> >
> > PS: The customer will forward this to the Confluent support too, but I'm
> > asking here for public visibility
> >
>
>

Re: Offsets/Lags for global state stores not shown

Posted by "Matthias J. Sax" <ma...@confluent.io>.
The topics of global stores are not included by design.

The "problem" is, that each instance needs to consume *all*
topic-partitions from and thus topis, we thus they cannot be include
into the consumer group that would assign each partition to exactly one
instance. Hence, an additional consumer is used that uses partition
assignment (instead of subscription) and this consumer does not commit
any offset to Kafka.

Note that global stores are bootstrapped before processing begins
though, and are expected to be low throughput topic anyway.


-Matthias

On 11/6/18 2:03 AM, Patrik Kleindl wrote:
> Hello
> 
> Am I doing something wrong or is it by design that global state stores and
> their consumers do not show up under the consumer-groups?
> With the consumer group command (and in control center as well) I don't get
> any output for the group:
> ./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
> --describe
> Note: This will not show information about old Zookeeper-based consumers.
> 
> If I query for the state I get a response that members are present:
> ./kafka-consumer-groups --bootstrap-server broker:9092 --group somegroup
> --describe --state
> Note: This will not show information about old Zookeeper-based consumers.
> 
> COORDINATOR (ID)                 ASSIGNMENT-STRATEGY
> STATE                #MEMBERS
> broker:9092 (1) stream                    Stable               2
> 
> This is quite irritating as we cannot see if a global state store has
> caught up with a backlog of messages.
> 
> Code to reproduce:
>         builder.globalTable(TOPIC_NAME, Materialized
>                 .<String, String, KeyValueStore<Bytes, byte[]>>as(STORENAME)
>                 .withKeySerde(Serdes.String())
>                 .withValueSerde(Serdes.String()));
> 
> Nothing fancy.
> 
> Logs:
> 2018-11-05 21:25:56 INFO  AbstractCoordinator:442 - (Re-)joining group
> 2018-11-05 21:25:56 INFO  StreamPartitionAssignor:481 - Assigned tasks to
> clients as {e0250aa5-e1c6-4d33-a746-bc9357c66965=[activeTasks: ([])
> standbyTasks: ([]) assignedTasks: ([]) prevActiveTasks: ([])
> prevAssignedTasks: ([]) capacity: 1]}.
> 2018-11-05 21:25:56 WARN  ConsumerCoordinator:376 - The following
> subscribed topics are not assigned to any members: [storetopic]
> 2018-11-05 21:25:56 INFO  AbstractCoordinator:409 - Successfully joined
> group with generation 3
> 2018-11-05 21:25:56 INFO  ConsumerCoordinator:256 - Setting newly assigned
> partitions []
> 
> The store works after this, but it is not shown.
> 
> Any input is appreciated
> 
> best regards
> 
> Patrik
> 
> PS: The customer will forward this to the Confluent support too, but I'm
> asking here for public visibility
>