You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mazen Ezzeddine <ma...@etu.univ-cotedazur.fr> on 2021/04/09 13:04:27 UTC

Deletion a specific consumer of a consumer group through the Kafka admin client API.

Dear all,



The kafka admin client API enables the deletion of a consumer group through a logic like the one shown below



DeleteConsumerGroupsResult deleteConsumerGroupsResult = adminClient.deleteConsumerGroups(Arrays.asList(consumerGroupToBeDeleted));



However, is there any way/API through which the admin client can delete a specific consumer of a consumer group (e.g., say, the one which has the minimum consumption rate )…





My kafka cluster is running on Kubernetes and I can use a Kubernetes client to dynamically delete a particular pod (eventually a Kafka consumer). However, naming of consumer pods in Kubernetes is

different than the actual consumer names in a Kafka cluster that are used by an admin client to resolve and access consumers. And hence, the association between Kubernetes kafka consumer pods

 naming and the actual consumer names used (by the admin client)  inside kafka cluster looks difficult…



Any hint on that issue?



Between, the decision to implement a whole consumer group deletion API on the admin client but not an individual consumer in the group? is just a design decision or there is any technical/protocol

issues that enforce not offering such an API?



Thank you.



Re: Deletion a specific consumer of a consumer group through the Kafka admin client API.

Posted by Mazen Ezzeddine <ma...@etu.univ-cotedazur.fr>.
Thanks so much Sophie for your super ideas and suggestions. Very interesting and smart workarounds/ideas,  will try and surely come back to further report the outcome and/or issues if any.

Best regards,

________________________________
From: Sophie Blee-Goldman <so...@confluent.io.INVALID>
Sent: Saturday, April 10, 2021 3:34 AM
To: Users <us...@kafka.apache.org>
Subject: Re: Deletion a specific consumer of a consumer group through the Kafka admin client API.

Hey Mazen,

There is technically an AdminClient API which allows you to remove a single
member from the group:

Admin.removeMembersFromConsumerGroup(String groupId,
RemoveMembersFromConsumerGroupOptions options);

but I don't think this will help you. For one thing, this API only works
for removing members of a consumer group that uses static membership. The
*RemoveMembersFromConsumerGroupOptions* parameter only allows you to
specify the consumer to remove via the groupInstanceId, which defines a
static member.

Perhaps more importantly, this API does not *force* the member to
permanently be expelled from the group, rather it just informs the broker
that this particular consumer has actually already left the group and
therefore it should be kicked out. With static membership, it can take a
while for the broker to notice a member who has actually left the group vs
one who has temporarily "disappeared", eg due to a pod restart or a rolling
bounce. So this API was added to let users explicitly remove a member from
the group when that consumer app has been taken down in order to let its
partitions be reassigned to other members as quickly as possible.

If you tried to use that API on a static member who was still actively
running, I'm pretty sure it would just rejoin the group the next time it
called poll(). I think this hints at the motivation for not providing a
"forcibly remove member from group" API in the first place -- consumer
groups generally work the other way around, ie the existence of individual
consumers calling poll() is what determines who is/isn't in the group, and
the broker is just there to keep track so it distribute partitions only
among the live members. The group coordinator isn't the "source of truth"
for which consumers are actually in the group, that would be the consumers
themselves. Does that make sense?

But I see where you're coming from, so maybe there's another way to do what
you're trying to do. I would focus less on trying to figure out pod names
and more on how to get consumers to spin up/down when you want to scale
in/out -- after all k8s and the physical pods are just a detail of the
underlying architecture, and should probably be abstracted away from the
logic of your Consumer application. Instead of killing the pod, can you
just stop/exit the process? For example let's say you just want to remove a
member if it goes below some threshold consumption rate.

public static void main() {
    Consumer consumer = new KafkaConsumer();
    int consumptionRate;
    while (consumptionRate > threshold) {

        consumer.poll();
        // do some processing

        // update the consumption rate
       consumptionRate = computeConsumptionRate();
    }
    // done -- just exit the JVM
}

If you had something slightly more complicated in mind, like removing the
consumer with the worst consumption rate after each rebalance, then you
could use the Subscription to have each consumer report its consumption
rate to the group leader. Then the leader figures out the worst one and
sends a signal back in its Assignment to inform that Consumer to shut down
after this rebalance (and, presumably, don't assign any partitions to the
one shutting down)

On Fri, Apr 9, 2021 at 6:04 AM Mazen Ezzeddine <
mazen.ezzeddine@etu.univ-cotedazur.fr> wrote:

> Dear all,
>
>
>
> The kafka admin client API enables the deletion of a consumer group
> through a logic like the one shown below
>
>
>
> DeleteConsumerGroupsResult deleteConsumerGroupsResult =
> adminClient.deleteConsumerGroups(Arrays.asList(consumerGroupToBeDeleted));
>
>
>
> However, is there any way/API through which the admin client can delete a
> specific consumer of a consumer group (e.g., say, the one which has the
> minimum consumption rate )…
>
>
>
>
>
> My kafka cluster is running on Kubernetes and I can use a Kubernetes
> client to dynamically delete a particular pod (eventually a Kafka
> consumer). However, naming of consumer pods in Kubernetes is
>
> different than the actual consumer names in a Kafka cluster that are used
> by an admin client to resolve and access consumers. And hence, the
> association between Kubernetes kafka consumer pods
>
>  naming and the actual consumer names used (by the admin client)  inside
> kafka cluster looks difficult…
>
>
>
> Any hint on that issue?
>
>
>
> Between, the decision to implement a whole consumer group deletion API on
> the admin client but not an individual consumer in the group? is just a
> design decision or there is any technical/protocol
>
> issues that enforce not offering such an API?
>
>
>
> Thank you.
>
>
>

Re: Deletion a specific consumer of a consumer group through the Kafka admin client API.

Posted by Sophie Blee-Goldman <so...@confluent.io.INVALID>.
Hey Mazen,

There is technically an AdminClient API which allows you to remove a single
member from the group:

Admin.removeMembersFromConsumerGroup(String groupId,
RemoveMembersFromConsumerGroupOptions options);

but I don't think this will help you. For one thing, this API only works
for removing members of a consumer group that uses static membership. The
*RemoveMembersFromConsumerGroupOptions* parameter only allows you to
specify the consumer to remove via the groupInstanceId, which defines a
static member.

Perhaps more importantly, this API does not *force* the member to
permanently be expelled from the group, rather it just informs the broker
that this particular consumer has actually already left the group and
therefore it should be kicked out. With static membership, it can take a
while for the broker to notice a member who has actually left the group vs
one who has temporarily "disappeared", eg due to a pod restart or a rolling
bounce. So this API was added to let users explicitly remove a member from
the group when that consumer app has been taken down in order to let its
partitions be reassigned to other members as quickly as possible.

If you tried to use that API on a static member who was still actively
running, I'm pretty sure it would just rejoin the group the next time it
called poll(). I think this hints at the motivation for not providing a
"forcibly remove member from group" API in the first place -- consumer
groups generally work the other way around, ie the existence of individual
consumers calling poll() is what determines who is/isn't in the group, and
the broker is just there to keep track so it distribute partitions only
among the live members. The group coordinator isn't the "source of truth"
for which consumers are actually in the group, that would be the consumers
themselves. Does that make sense?

But I see where you're coming from, so maybe there's another way to do what
you're trying to do. I would focus less on trying to figure out pod names
and more on how to get consumers to spin up/down when you want to scale
in/out -- after all k8s and the physical pods are just a detail of the
underlying architecture, and should probably be abstracted away from the
logic of your Consumer application. Instead of killing the pod, can you
just stop/exit the process? For example let's say you just want to remove a
member if it goes below some threshold consumption rate.

public static void main() {
    Consumer consumer = new KafkaConsumer();
    int consumptionRate;
    while (consumptionRate > threshold) {

        consumer.poll();
        // do some processing

        // update the consumption rate
       consumptionRate = computeConsumptionRate();
    }
    // done -- just exit the JVM
}

If you had something slightly more complicated in mind, like removing the
consumer with the worst consumption rate after each rebalance, then you
could use the Subscription to have each consumer report its consumption
rate to the group leader. Then the leader figures out the worst one and
sends a signal back in its Assignment to inform that Consumer to shut down
after this rebalance (and, presumably, don't assign any partitions to the
one shutting down)

On Fri, Apr 9, 2021 at 6:04 AM Mazen Ezzeddine <
mazen.ezzeddine@etu.univ-cotedazur.fr> wrote:

> Dear all,
>
>
>
> The kafka admin client API enables the deletion of a consumer group
> through a logic like the one shown below
>
>
>
> DeleteConsumerGroupsResult deleteConsumerGroupsResult =
> adminClient.deleteConsumerGroups(Arrays.asList(consumerGroupToBeDeleted));
>
>
>
> However, is there any way/API through which the admin client can delete a
> specific consumer of a consumer group (e.g., say, the one which has the
> minimum consumption rate )…
>
>
>
>
>
> My kafka cluster is running on Kubernetes and I can use a Kubernetes
> client to dynamically delete a particular pod (eventually a Kafka
> consumer). However, naming of consumer pods in Kubernetes is
>
> different than the actual consumer names in a Kafka cluster that are used
> by an admin client to resolve and access consumers. And hence, the
> association between Kubernetes kafka consumer pods
>
>  naming and the actual consumer names used (by the admin client)  inside
> kafka cluster looks difficult…
>
>
>
> Any hint on that issue?
>
>
>
> Between, the decision to implement a whole consumer group deletion API on
> the admin client but not an individual consumer in the group? is just a
> design decision or there is any technical/protocol
>
> issues that enforce not offering such an API?
>
>
>
> Thank you.
>
>
>