You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Richard Ney <ri...@lookout.com.INVALID> on 2022/03/16 04:59:22 UTC

Setting up the CooperativeStickyAssignor in Java

Trying to find a good sample of what consumer settings besides setting

ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG to
org.apache.kafka.clients.consumer.CooperativeStickyAssignor

is needed to make the rebalance happen cleanly. Unable to find and decent
documentation or code samples. I have set the Group Instance Id to the EC2
instance id based on one blog write up I found.

Any help would be appreciated

-Richard

Re: Setting up the CooperativeStickyAssignor in Java

Posted by Richard Ney <ka...@gmail.com>.
Hi Liam,

I've gotten the cooperative sticky assignor to work with the latest
fs2-kafka wrapper. There was a bug in my code where the `.parJoinUnbounded`
which processes the streams needed to move out 1 scope of execution to pull
in the notification message stream. It's possible that the 2.4.0 version of
the f2-kafka wrapper would also work after my fix.

"timestamp":"2022-03-25T17:22:08.736Z"
Updating assignment with
Assigned partitions:                       [dw_notifications_aoa_v3-8,
dw_notifications_aoa_v3-7]
Current owned partitions:                  [dw_notifications_aoa_v3-9,
dw_notifications_aoa_v3-8, dw_notifications_aoa_v3-7]
Added partitions (assigned - owned):       []
Revoked partitions (owned - assigned):     [dw_notifications_aoa_v3-9]

"timestamp":"2022-03-25T17:25:33.721Z
Updating assignment with
    Assigned partitions:                       [dw_notifications_aoa_v3-4,
dw_notifications_aoa_v3-2, dw_notifications_aoa_v3-0,
dw_notifications_aoa_v3-8, dw_notifications_aoa_v3-7]
Current owned partitions:                  [dw_notifications_aoa_v3-8,
dw_notifications_aoa_v3-7]
Added partitions (assigned - owned):       [dw_notifications_aoa_v3-4,
dw_notifications_aoa_v3-2, dw_notifications_aoa_v3-0]
Revoked partitions (owned - assigned):     []

{"timestamp":"2022-03-25T17:25:33.725Z","message":"New partition assigned
to consumer for Topic: dw_notifications_aoa_v3:0",
{"timestamp":"2022-03-25T17:25:33.728Z","message":"New partition assigned
to consumer for Topic: dw_notifications_aoa_v3:2",
{"timestamp":"2022-03-25T17:25:33.729Z","message":"New partition assigned
to consumer for Topic: dw_notifications_aoa_v3:4",

On Sat, Mar 19, 2022 at 6:00 PM Liam Clarke-Hutchinson <lc...@redhat.com>
wrote:

> Hi Richard,
>
> Yeah, the old 2.8.1 version of Kafka clients used by trunk fs2-kafka is
> what I think might be the issue, not the wrapper itself, sorry if I was
> unclear on that.
>
> Please let us know how your testing with the latest fs2-kafka that's using
> 3.1.0 goes. :)
>
> Kind regards,
>
> Liam Clarke-Hutchinson
>
> On Sun, 20 Mar 2022 at 07:19, Richard Ney <ka...@gmail.com> wrote:
>
> > I am using the kafka-clients through the fs2-kafka wrapper. Thou the log
> > message I posted and copied again here
> >
> > Notifying assignor about the new Assignment(partitions=[
> > platform-data.appquery-platform.aoav3.backfill-28,
> > platform-data.appquery-platform.aoav3.backfill-31,
> > platform-data.appquery-platform.aoav3.backfill-34,
> > platform-data.appquery-platform.aoav3.backfill-37,
> > platform-data.appquery-platform.aoav3.backfill-40,
> > platform-data.appquery-platform.aoav3.backfill-43,
> > platform-data.appquery-platform.aoav3.backfill-46,
> >
> >
> platform-data.appquery-platform.aoav3.backfill-49])","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> >
> > is not generated by the fs2-kafka wrapper. Based on the kafka-client
> code;
> > these are the log messages generated during the initial partition
> > assignment when my application connects to the Kafka brokers. If this
> list
> > contained the full list of partitions listed in the kafka-consumer-groups
> > output but the lag was increasing on 2 partitions, I would immediately
> > suspect the fs2-kafka wrapper as the issue. The fact that the
> notification
> > messages from the kafka-clients library to the fs2-kafka library are
> > missing two partitions makes me suspect the issue is in the kafka-clients
> > library. In this occurrence this happened on 2 of the 5 consumer
> instances.
> > The version of the kafka-clients library used by the version of the
> > fs2-kafka library for this test is *2.8.1*. I'm currently running another
> > test with the latest fs2-kafka library which is consuming the *3.1.0*
> > version of the kafka-clients library. Initial partition assignment was
> > successful. On Monday I'll do a large number of scale-up/scale-down tests
> > to force rebalancing of partitions to see if I can replicate the issue
> > using the latest version.
> >
> > On Sat, Mar 19, 2022 at 2:06 AM Liam Clarke-Hutchinson <
> > lclarkeh@redhat.com>
> > wrote:
> >
> > > So to clarify, you're using kafka-clients directly? Or via fx2-kafka?
> If
> > > it's kafka-clients directly, what version please?
> > >
> > > On Sat, 19 Mar 2022 at 19:59, Richard Ney <ka...@gmail.com>
> > wrote:
> > >
> > > > Hi Liam,
> > > >
> > > > Sorry for the mis-identification in the last email. The fun of
> > answering
> > > an
> > > > email on a phone instead of a desktop. I confirmed the upper log
> > > messages I
> > > > included in the message come from this location in the
> `kafka-clients`
> > > > library
> > > >
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L422
> > > >
> > > > And it's only including 8 of the 10 partitions that were assigned to
> > that
> > > > consumer instance.
> > > >
> > > > -Richard
> > > >
> > > > On Fri, Mar 18, 2022 at 11:15 PM Richard Ney <kamisama.ney@gmail.com
> >
> > > > wrote:
> > > >
> > > > > Hi Ngā mihi,
> > > > >
> > > > > I believe the log entry I included was from the underlying
> > > kafka-clients
> > > > > library given that the logger identified is
> > > > > “org.apache.kafka.clients.consumer.internals.ConsumerCoordinator”.
> > I’ll
> > > > > admit at first I thought it also might be the fs2-kafka wrapper
> given
> > > > that
> > > > > the 2.4.0 version is the first version that has correct support for
> > the
> > > > > messaging from the ConsumerCoordinator. I am planning to do a test
> > with
> > > > the
> > > > > 3.0.0-M5 version which is built on the updated 3.1.0 kafka-clients
> > > > library
> > > > > and will let the list know.
> > > > >
> > > > > -Richard Ney
> > > > >
> > > > > Sent from my iPhone
> > > > >
> > > > > > On Mar 18, 2022, at 10:55 PM, Liam Clarke-Hutchinson <
> > > > > lclarkeh@redhat.com> wrote:
> > > > > >
> > > > > > Kia ora Richard,
> > > > > >
> > > > > > I see support for the Cooperative Sticky Assignor in fs2-kafka is
> > > quite
> > > > > > new. Have you discussed this issue with the community of that
> > client
> > > at
> > > > > > all? I ask because I see on GitHub that fs2-kafka is using
> > > > kafka-clients
> > > > > > 2.8.1 as the underlying client, and there's been a fair few
> > bugfixes
> > > > > around
> > > > > > the cooperative sticky assignor since that version.
> > > > > >
> > > > > > Could you perhaps try overriding the kafka-clients dependency of
> > > > > fs2-kafka
> > > > > > and try a higher version, perhaps 3.1.0, and see if the issue
> > > remains?
> > > > > I'm
> > > > > > not sure how well that'll work, but might help narrow down the
> > issue.
> > > > > >
> > > > > > Ngā mihi,
> > > > > >
> > > > > > Liam Clarke-Hutchinson
> > > > > >
> > > > > >> On Sat, 19 Mar 2022 at 14:24, Richard Ney <
> kamisama.ney@gmail.com
> > >
> > > > > wrote:
> > > > > >>
> > > > > >> Thanks for the additional information Bruno. Does this look
> like a
> > > > > possible
> > > > > >> bug in the CooperativeStickyAssignor? I have 5 consumers reading
> > > from
> > > > a
> > > > > 50
> > > > > >> partition topic. Based on the log messages this application
> > instance
> > > > is
> > > > > >> only getting assigned 8 partitions but when I ask the consumer
> > group
> > > > for
> > > > > >> LAG information the consumer group thinks the correct number of
> 10
> > > > > >> partitions were assigned but as should 2 partitions aren't
> getting
> > > > read
> > > > > due
> > > > > >> to the application not knowing it has them.
> > > > > >>
> > > > > >> {"timestamp":"2022-03-19T00:54:46.025Z","message":"[Consumer
> > > > > >> instanceId=i-0e89c9bee06f71f68,
> > > > > >>
> > > > >
> > >
> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> > > > > >> groupId=app-query-platform-aoa-backfill-v7] Updating assignment
> > > > with\n\t
> > > > > >> Assigned partitions: [
> > > > > >> platform-data.appquery-platform.aoav3.backfill-28,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-43,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-31,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-46,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-34,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-49,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-40,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-37] \n\t
> > > > > >> Current owned partitions:                  []\n\t
> > > > > >>
> > > > > >> Added partitions (assigned - owned):       [
> > > > > >> platform-data.appquery-platform.aoav3.backfill-28,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-43,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-31,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-46,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-34,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-49,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-40,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-37]\n\t
> > > > > >>
> > > > > >> Revoked partitions (owned - assigned):
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> []\n","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> > > > > >>
> > > > > >>
> > > > > >> {"timestamp":"2022-03-19T00:54:46.026Z","message":"[Consumer
> > > > > >> instanceId=i-0e89c9bee06f71f68,
> > > > > >>
> > > > >
> > >
> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> > > > > >> groupId=app-query-platform-aoa-backfill-v7]
> > > > > >>
> > > > > >> Notifying assignor about the new Assignment(partitions=[
> > > > > >> platform-data.appquery-platform.aoav3.backfill-28,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-31,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-34,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-37,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-40,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-43,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-46,
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> platform-data.appquery-platform.aoav3.backfill-49])","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> > > > > >>
> > > > > >> {"timestamp":"2022-03-19T00:54:46.028Z","message":"[Consumer
> > > > > >> instanceId=i-0e89c9bee06f71f68,
> > > > > >>
> > > > >
> > >
> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> > > > > >> groupId=app-query-platform-aoa-backfill-v7]
> > > > > >>
> > > > > >> Adding newly assigned partitions:
> > > > > >> platform-data.appquery-platform.aoav3.backfill-28,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-43,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-31,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-46,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-34,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-49,
> > > > > >> platform-data.appquery-platform.aoav3.backfill-40,
> > > > > >>
> > > > > >>
> > > > >
> > > >
> > >
> >
> platform-data.appquery-platform.aoav3.backfill-37","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> > > > > >>
> > > > > >> *OUTPUT FROM `/usr/local/opt/kafka/bin/kafka-consumer-groups`*
> > > > > >>
> > > > > >> GROUP                              TOPIC
> > > > > >>       PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> > > > > >> CONSUMER-ID                                              HOST
> > > > > >> CLIENT-ID
> > > > > >> app-query-platform-aoa-backfill-v7
> > > > > >> platform-data.appquery-platform.aoav3.backfill 40
>  8369679
> > > > > >> 8369696         17
> > > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > > 10.123.16.69
> > > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > > >> app-query-platform-aoa-backfill-v7
> > > > > >> platform-data.appquery-platform.aoav3.backfill 37
>  8369643
> > > > > >> 8369658         15
> > > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > > 10.123.16.69
> > > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > > >> app-query-platform-aoa-backfill-v7
> > > > > >> platform-data.appquery-platform.aoav3.backfill 46
>  8368044
> > > > > >> 8368055         11
> > > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > > 10.123.16.69
> > > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > > >> app-query-platform-aoa-backfill-v7
> > > > > >> platform-data.appquery-platform.aoav3.backfill 34
>  8379346
> > > > > >> 8379358         12
> > > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > > 10.123.16.69
> > > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > > >> app-query-platform-aoa-backfill-v7
> > > > > >> platform-data.appquery-platform.aoav3.backfill 28
>  8374244
> > > > > >> 8374247         3
> > > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > > 10.123.16.69
> > > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > > >> app-query-platform-aoa-backfill-v7
> > > > > >> platform-data.appquery-platform.aoav3.backfill 49
>  8364656
> > > > > >> 8364665         9
> > > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > > 10.123.16.69
> > > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > > >> app-query-platform-aoa-backfill-v7
> > > > > >> platform-data.appquery-platform.aoav3.backfill 43
>  8369980
> > > > > >> 8369988         8
> > > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > > 10.123.16.69
> > > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > > >> app-query-platform-aoa-backfill-v7
> > > > > >> platform-data.appquery-platform.aoav3.backfill 25
>  8369261
> > > > > >> 8370063         802
> > > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > > 10.123.16.69
> > > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > > >> app-query-platform-aoa-backfill-v7
> > > > > >> platform-data.appquery-platform.aoav3.backfill 31
>  8368087
> > > > > >> 8368097         10
> > > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > > 10.123.16.69
> > > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > > >> app-query-platform-aoa-backfill-v7
> > > > > >> platform-data.appquery-platform.aoav3.backfill 22
>  8370475
> > > > > >> 8371319         844
> > > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > > 10.123.16.69
> > > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > > >>
> > > > > >>> On Thu, Mar 17, 2022 at 2:29 AM Bruno Cadonna <
> > cadonna@apache.org>
> > > > > wrote:
> > > > > >>>
> > > > > >>> Hi Richard,
> > > > > >>>
> > > > > >>> The group.instance.id config is orthogonal to the partition
> > > > assignment
> > > > > >>> strategy. The group.instance.id is used if you want to have
> > static
> > > > > >>> membership which is not related to the partition assignment
> > > strategy.
> > > > > >>>
> > > > > >>> If you think you found a bug, could you please open a JIRA
> ticket
> > > > with
> > > > > >>> steps to reproduce the bug.
> > > > > >>>
> > > > > >>> Best,
> > > > > >>> Bruno
> > > > > >>>
> > > > > >>> On 16.03.22 10:01, Luke Chen wrote:
> > > > > >>>> Hi Richard,
> > > > > >>>>
> > > > > >>>> Right, you are not missing any settings beyond the partition
> > > > > assignment
> > > > > >>>> strategy and the group instance id.
> > > > > >>>> You might need to know from the log that why the rebalance
> > > triggered
> > > > > to
> > > > > >>> do
> > > > > >>>> troubleshooting.
> > > > > >>>>
> > > > > >>>> Thank you.
> > > > > >>>> Luke
> > > > > >>>>
> > > > > >>>> On Wed, Mar 16, 2022 at 3:02 PM Richard Ney <
> > > kamisama.ney@gmail.com
> > > > >
> > > > > >>> wrote:
> > > > > >>>>
> > > > > >>>>> Hi Luke,
> > > > > >>>>>
> > > > > >>>>> I did end up with a situation where I had two instances
> > > connecting
> > > > to
> > > > > >>> the
> > > > > >>>>> same consumer group and they ended up in a rebalance
> trade-off.
> > > All
> > > > > >>>>> partitions kept going back and forth between the two
> > microservice
> > > > > >>>>> instances. That was a test case where I'd removed the Group
> > > > Instance
> > > > > >> Id
> > > > > >>>>> setting to see what would happen. I stabilized that one by
> > > reducing
> > > > > it
> > > > > >>> to a
> > > > > >>>>> single consumer after 20+ rebalances.
> > > > > >>>>>
> > > > > >>>>> The other issue I'm seeing may be a bug in the Functional
> Scala
> > > > > >>> `fs2-kafka`
> > > > > >>>>> wrapper where I see the partitions cleanly assigned but one
> or
> > > more
> > > > > >>>>> instances isn't ingesting. I found out that they recently
> added
> > > > > >> support
> > > > > >>> for
> > > > > >>>>> the cooperative sticky assignor for the stream recreation
> since
> > > > they
> > > > > >>> were
> > > > > >>>>> assuming a full revocation of the partitions.
> > > > > >>>>>
> > > > > >>>>> So I basically wanted to make sure I wasn't missing any
> > settings
> > > > > >> beyond
> > > > > >>> the
> > > > > >>>>> partition assignment strategy and the group instance id.
> > > > > >>>>>
> > > > > >>>>> -Richard
> > > > > >>>>>
> > > > > >>>>> -Richard
> > > > > >>>>>
> > > > > >>>>> On Tue, Mar 15, 2022 at 11:27 PM Luke Chen <
> showuon@gmail.com>
> > > > > wrote:
> > > > > >>>>>
> > > > > >>>>>> Hi Richard,
> > > > > >>>>>>
> > > > > >>>>>> To use `CooperativeStickyAssignor`, no other special
> > > configuration
> > > > > is
> > > > > >>>>>> required.
> > > > > >>>>>>
> > > > > >>>>>> I'm not sure what does `make the rebalance happen cleanly`
> > mean.
> > > > > >>>>>> Did you find any problem during group rebalance?
> > > > > >>>>>>
> > > > > >>>>>> Thank you.
> > > > > >>>>>> Luke
> > > > > >>>>>>
> > > > > >>>>>> On Wed, Mar 16, 2022 at 1:00 PM Richard Ney <
> > > > > richard.ney@lookout.com
> > > > > >>>>>> .invalid>
> > > > > >>>>>> wrote:
> > > > > >>>>>>
> > > > > >>>>>>> Trying to find a good sample of what consumer settings
> > besides
> > > > > >> setting
> > > > > >>>>>>>
> > > > > >>>>>>> ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG to
> > > > > >>>>>>> org.apache.kafka.clients.consumer.CooperativeStickyAssignor
> > > > > >>>>>>>
> > > > > >>>>>>> is needed to make the rebalance happen cleanly. Unable to
> > find
> > > > and
> > > > > >>>>> decent
> > > > > >>>>>>> documentation or code samples. I have set the Group
> Instance
> > Id
> > > > to
> > > > > >> the
> > > > > >>>>>> EC2
> > > > > >>>>>>> instance id based on one blog write up I found.
> > > > > >>>>>>>
> > > > > >>>>>>> Any help would be appreciated
> > > > > >>>>>>>
> > > > > >>>>>>> -Richard
> > > > > >>>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > >
> > > >
> > >
> >
>

Re: Setting up the CooperativeStickyAssignor in Java

Posted by Liam Clarke-Hutchinson <lc...@redhat.com>.
Hi Richard,

Yeah, the old 2.8.1 version of Kafka clients used by trunk fs2-kafka is
what I think might be the issue, not the wrapper itself, sorry if I was
unclear on that.

Please let us know how your testing with the latest fs2-kafka that's using
3.1.0 goes. :)

Kind regards,

Liam Clarke-Hutchinson

On Sun, 20 Mar 2022 at 07:19, Richard Ney <ka...@gmail.com> wrote:

> I am using the kafka-clients through the fs2-kafka wrapper. Thou the log
> message I posted and copied again here
>
> Notifying assignor about the new Assignment(partitions=[
> platform-data.appquery-platform.aoav3.backfill-28,
> platform-data.appquery-platform.aoav3.backfill-31,
> platform-data.appquery-platform.aoav3.backfill-34,
> platform-data.appquery-platform.aoav3.backfill-37,
> platform-data.appquery-platform.aoav3.backfill-40,
> platform-data.appquery-platform.aoav3.backfill-43,
> platform-data.appquery-platform.aoav3.backfill-46,
>
> platform-data.appquery-platform.aoav3.backfill-49])","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
>
> is not generated by the fs2-kafka wrapper. Based on the kafka-client code;
> these are the log messages generated during the initial partition
> assignment when my application connects to the Kafka brokers. If this list
> contained the full list of partitions listed in the kafka-consumer-groups
> output but the lag was increasing on 2 partitions, I would immediately
> suspect the fs2-kafka wrapper as the issue. The fact that the notification
> messages from the kafka-clients library to the fs2-kafka library are
> missing two partitions makes me suspect the issue is in the kafka-clients
> library. In this occurrence this happened on 2 of the 5 consumer instances.
> The version of the kafka-clients library used by the version of the
> fs2-kafka library for this test is *2.8.1*. I'm currently running another
> test with the latest fs2-kafka library which is consuming the *3.1.0*
> version of the kafka-clients library. Initial partition assignment was
> successful. On Monday I'll do a large number of scale-up/scale-down tests
> to force rebalancing of partitions to see if I can replicate the issue
> using the latest version.
>
> On Sat, Mar 19, 2022 at 2:06 AM Liam Clarke-Hutchinson <
> lclarkeh@redhat.com>
> wrote:
>
> > So to clarify, you're using kafka-clients directly? Or via fx2-kafka? If
> > it's kafka-clients directly, what version please?
> >
> > On Sat, 19 Mar 2022 at 19:59, Richard Ney <ka...@gmail.com>
> wrote:
> >
> > > Hi Liam,
> > >
> > > Sorry for the mis-identification in the last email. The fun of
> answering
> > an
> > > email on a phone instead of a desktop. I confirmed the upper log
> > messages I
> > > included in the message come from this location in the `kafka-clients`
> > > library
> > >
> > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L422
> > >
> > > And it's only including 8 of the 10 partitions that were assigned to
> that
> > > consumer instance.
> > >
> > > -Richard
> > >
> > > On Fri, Mar 18, 2022 at 11:15 PM Richard Ney <ka...@gmail.com>
> > > wrote:
> > >
> > > > Hi Ngā mihi,
> > > >
> > > > I believe the log entry I included was from the underlying
> > kafka-clients
> > > > library given that the logger identified is
> > > > “org.apache.kafka.clients.consumer.internals.ConsumerCoordinator”.
> I’ll
> > > > admit at first I thought it also might be the fs2-kafka wrapper given
> > > that
> > > > the 2.4.0 version is the first version that has correct support for
> the
> > > > messaging from the ConsumerCoordinator. I am planning to do a test
> with
> > > the
> > > > 3.0.0-M5 version which is built on the updated 3.1.0 kafka-clients
> > > library
> > > > and will let the list know.
> > > >
> > > > -Richard Ney
> > > >
> > > > Sent from my iPhone
> > > >
> > > > > On Mar 18, 2022, at 10:55 PM, Liam Clarke-Hutchinson <
> > > > lclarkeh@redhat.com> wrote:
> > > > >
> > > > > Kia ora Richard,
> > > > >
> > > > > I see support for the Cooperative Sticky Assignor in fs2-kafka is
> > quite
> > > > > new. Have you discussed this issue with the community of that
> client
> > at
> > > > > all? I ask because I see on GitHub that fs2-kafka is using
> > > kafka-clients
> > > > > 2.8.1 as the underlying client, and there's been a fair few
> bugfixes
> > > > around
> > > > > the cooperative sticky assignor since that version.
> > > > >
> > > > > Could you perhaps try overriding the kafka-clients dependency of
> > > > fs2-kafka
> > > > > and try a higher version, perhaps 3.1.0, and see if the issue
> > remains?
> > > > I'm
> > > > > not sure how well that'll work, but might help narrow down the
> issue.
> > > > >
> > > > > Ngā mihi,
> > > > >
> > > > > Liam Clarke-Hutchinson
> > > > >
> > > > >> On Sat, 19 Mar 2022 at 14:24, Richard Ney <kamisama.ney@gmail.com
> >
> > > > wrote:
> > > > >>
> > > > >> Thanks for the additional information Bruno. Does this look like a
> > > > possible
> > > > >> bug in the CooperativeStickyAssignor? I have 5 consumers reading
> > from
> > > a
> > > > 50
> > > > >> partition topic. Based on the log messages this application
> instance
> > > is
> > > > >> only getting assigned 8 partitions but when I ask the consumer
> group
> > > for
> > > > >> LAG information the consumer group thinks the correct number of 10
> > > > >> partitions were assigned but as should 2 partitions aren't getting
> > > read
> > > > due
> > > > >> to the application not knowing it has them.
> > > > >>
> > > > >> {"timestamp":"2022-03-19T00:54:46.025Z","message":"[Consumer
> > > > >> instanceId=i-0e89c9bee06f71f68,
> > > > >>
> > > >
> > clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> > > > >> groupId=app-query-platform-aoa-backfill-v7] Updating assignment
> > > with\n\t
> > > > >> Assigned partitions: [
> > > > >> platform-data.appquery-platform.aoav3.backfill-28,
> > > > >> platform-data.appquery-platform.aoav3.backfill-43,
> > > > >> platform-data.appquery-platform.aoav3.backfill-31,
> > > > >> platform-data.appquery-platform.aoav3.backfill-46,
> > > > >> platform-data.appquery-platform.aoav3.backfill-34,
> > > > >> platform-data.appquery-platform.aoav3.backfill-49,
> > > > >> platform-data.appquery-platform.aoav3.backfill-40,
> > > > >> platform-data.appquery-platform.aoav3.backfill-37] \n\t
> > > > >> Current owned partitions:                  []\n\t
> > > > >>
> > > > >> Added partitions (assigned - owned):       [
> > > > >> platform-data.appquery-platform.aoav3.backfill-28,
> > > > >> platform-data.appquery-platform.aoav3.backfill-43,
> > > > >> platform-data.appquery-platform.aoav3.backfill-31,
> > > > >> platform-data.appquery-platform.aoav3.backfill-46,
> > > > >> platform-data.appquery-platform.aoav3.backfill-34,
> > > > >> platform-data.appquery-platform.aoav3.backfill-49,
> > > > >> platform-data.appquery-platform.aoav3.backfill-40,
> > > > >> platform-data.appquery-platform.aoav3.backfill-37]\n\t
> > > > >>
> > > > >> Revoked partitions (owned - assigned):
> > > > >>
> > > > >>
> > > >
> > >
> >
> []\n","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> > > > >>
> > > > >>
> > > > >> {"timestamp":"2022-03-19T00:54:46.026Z","message":"[Consumer
> > > > >> instanceId=i-0e89c9bee06f71f68,
> > > > >>
> > > >
> > clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> > > > >> groupId=app-query-platform-aoa-backfill-v7]
> > > > >>
> > > > >> Notifying assignor about the new Assignment(partitions=[
> > > > >> platform-data.appquery-platform.aoav3.backfill-28,
> > > > >> platform-data.appquery-platform.aoav3.backfill-31,
> > > > >> platform-data.appquery-platform.aoav3.backfill-34,
> > > > >> platform-data.appquery-platform.aoav3.backfill-37,
> > > > >> platform-data.appquery-platform.aoav3.backfill-40,
> > > > >> platform-data.appquery-platform.aoav3.backfill-43,
> > > > >> platform-data.appquery-platform.aoav3.backfill-46,
> > > > >>
> > > > >>
> > > >
> > >
> >
> platform-data.appquery-platform.aoav3.backfill-49])","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> > > > >>
> > > > >> {"timestamp":"2022-03-19T00:54:46.028Z","message":"[Consumer
> > > > >> instanceId=i-0e89c9bee06f71f68,
> > > > >>
> > > >
> > clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> > > > >> groupId=app-query-platform-aoa-backfill-v7]
> > > > >>
> > > > >> Adding newly assigned partitions:
> > > > >> platform-data.appquery-platform.aoav3.backfill-28,
> > > > >> platform-data.appquery-platform.aoav3.backfill-43,
> > > > >> platform-data.appquery-platform.aoav3.backfill-31,
> > > > >> platform-data.appquery-platform.aoav3.backfill-46,
> > > > >> platform-data.appquery-platform.aoav3.backfill-34,
> > > > >> platform-data.appquery-platform.aoav3.backfill-49,
> > > > >> platform-data.appquery-platform.aoav3.backfill-40,
> > > > >>
> > > > >>
> > > >
> > >
> >
> platform-data.appquery-platform.aoav3.backfill-37","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> > > > >>
> > > > >> *OUTPUT FROM `/usr/local/opt/kafka/bin/kafka-consumer-groups`*
> > > > >>
> > > > >> GROUP                              TOPIC
> > > > >>       PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> > > > >> CONSUMER-ID                                              HOST
> > > > >> CLIENT-ID
> > > > >> app-query-platform-aoa-backfill-v7
> > > > >> platform-data.appquery-platform.aoav3.backfill 40         8369679
> > > > >> 8369696         17
> > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > 10.123.16.69
> > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > >> app-query-platform-aoa-backfill-v7
> > > > >> platform-data.appquery-platform.aoav3.backfill 37         8369643
> > > > >> 8369658         15
> > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > 10.123.16.69
> > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > >> app-query-platform-aoa-backfill-v7
> > > > >> platform-data.appquery-platform.aoav3.backfill 46         8368044
> > > > >> 8368055         11
> > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > 10.123.16.69
> > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > >> app-query-platform-aoa-backfill-v7
> > > > >> platform-data.appquery-platform.aoav3.backfill 34         8379346
> > > > >> 8379358         12
> > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > 10.123.16.69
> > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > >> app-query-platform-aoa-backfill-v7
> > > > >> platform-data.appquery-platform.aoav3.backfill 28         8374244
> > > > >> 8374247         3
> > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > 10.123.16.69
> > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > >> app-query-platform-aoa-backfill-v7
> > > > >> platform-data.appquery-platform.aoav3.backfill 49         8364656
> > > > >> 8364665         9
> > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > 10.123.16.69
> > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > >> app-query-platform-aoa-backfill-v7
> > > > >> platform-data.appquery-platform.aoav3.backfill 43         8369980
> > > > >> 8369988         8
> > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > 10.123.16.69
> > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > >> app-query-platform-aoa-backfill-v7
> > > > >> platform-data.appquery-platform.aoav3.backfill 25         8369261
> > > > >> 8370063         802
> > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > 10.123.16.69
> > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > >> app-query-platform-aoa-backfill-v7
> > > > >> platform-data.appquery-platform.aoav3.backfill 31         8368087
> > > > >> 8368097         10
> > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > 10.123.16.69
> > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > >> app-query-platform-aoa-backfill-v7
> > > > >> platform-data.appquery-platform.aoav3.backfill 22         8370475
> > > > >> 8371319         844
> > > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > > 10.123.16.69
> > > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > > >>
> > > > >>> On Thu, Mar 17, 2022 at 2:29 AM Bruno Cadonna <
> cadonna@apache.org>
> > > > wrote:
> > > > >>>
> > > > >>> Hi Richard,
> > > > >>>
> > > > >>> The group.instance.id config is orthogonal to the partition
> > > assignment
> > > > >>> strategy. The group.instance.id is used if you want to have
> static
> > > > >>> membership which is not related to the partition assignment
> > strategy.
> > > > >>>
> > > > >>> If you think you found a bug, could you please open a JIRA ticket
> > > with
> > > > >>> steps to reproduce the bug.
> > > > >>>
> > > > >>> Best,
> > > > >>> Bruno
> > > > >>>
> > > > >>> On 16.03.22 10:01, Luke Chen wrote:
> > > > >>>> Hi Richard,
> > > > >>>>
> > > > >>>> Right, you are not missing any settings beyond the partition
> > > > assignment
> > > > >>>> strategy and the group instance id.
> > > > >>>> You might need to know from the log that why the rebalance
> > triggered
> > > > to
> > > > >>> do
> > > > >>>> troubleshooting.
> > > > >>>>
> > > > >>>> Thank you.
> > > > >>>> Luke
> > > > >>>>
> > > > >>>> On Wed, Mar 16, 2022 at 3:02 PM Richard Ney <
> > kamisama.ney@gmail.com
> > > >
> > > > >>> wrote:
> > > > >>>>
> > > > >>>>> Hi Luke,
> > > > >>>>>
> > > > >>>>> I did end up with a situation where I had two instances
> > connecting
> > > to
> > > > >>> the
> > > > >>>>> same consumer group and they ended up in a rebalance trade-off.
> > All
> > > > >>>>> partitions kept going back and forth between the two
> microservice
> > > > >>>>> instances. That was a test case where I'd removed the Group
> > > Instance
> > > > >> Id
> > > > >>>>> setting to see what would happen. I stabilized that one by
> > reducing
> > > > it
> > > > >>> to a
> > > > >>>>> single consumer after 20+ rebalances.
> > > > >>>>>
> > > > >>>>> The other issue I'm seeing may be a bug in the Functional Scala
> > > > >>> `fs2-kafka`
> > > > >>>>> wrapper where I see the partitions cleanly assigned but one or
> > more
> > > > >>>>> instances isn't ingesting. I found out that they recently added
> > > > >> support
> > > > >>> for
> > > > >>>>> the cooperative sticky assignor for the stream recreation since
> > > they
> > > > >>> were
> > > > >>>>> assuming a full revocation of the partitions.
> > > > >>>>>
> > > > >>>>> So I basically wanted to make sure I wasn't missing any
> settings
> > > > >> beyond
> > > > >>> the
> > > > >>>>> partition assignment strategy and the group instance id.
> > > > >>>>>
> > > > >>>>> -Richard
> > > > >>>>>
> > > > >>>>> -Richard
> > > > >>>>>
> > > > >>>>> On Tue, Mar 15, 2022 at 11:27 PM Luke Chen <sh...@gmail.com>
> > > > wrote:
> > > > >>>>>
> > > > >>>>>> Hi Richard,
> > > > >>>>>>
> > > > >>>>>> To use `CooperativeStickyAssignor`, no other special
> > configuration
> > > > is
> > > > >>>>>> required.
> > > > >>>>>>
> > > > >>>>>> I'm not sure what does `make the rebalance happen cleanly`
> mean.
> > > > >>>>>> Did you find any problem during group rebalance?
> > > > >>>>>>
> > > > >>>>>> Thank you.
> > > > >>>>>> Luke
> > > > >>>>>>
> > > > >>>>>> On Wed, Mar 16, 2022 at 1:00 PM Richard Ney <
> > > > richard.ney@lookout.com
> > > > >>>>>> .invalid>
> > > > >>>>>> wrote:
> > > > >>>>>>
> > > > >>>>>>> Trying to find a good sample of what consumer settings
> besides
> > > > >> setting
> > > > >>>>>>>
> > > > >>>>>>> ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG to
> > > > >>>>>>> org.apache.kafka.clients.consumer.CooperativeStickyAssignor
> > > > >>>>>>>
> > > > >>>>>>> is needed to make the rebalance happen cleanly. Unable to
> find
> > > and
> > > > >>>>> decent
> > > > >>>>>>> documentation or code samples. I have set the Group Instance
> Id
> > > to
> > > > >> the
> > > > >>>>>> EC2
> > > > >>>>>>> instance id based on one blog write up I found.
> > > > >>>>>>>
> > > > >>>>>>> Any help would be appreciated
> > > > >>>>>>>
> > > > >>>>>>> -Richard
> > > > >>>>>>>
> > > > >>>>>>
> > > > >>>>>
> > > > >>>>
> > > > >>>
> > > > >>
> > > >
> > >
> >
>

Re: Setting up the CooperativeStickyAssignor in Java

Posted by Richard Ney <ka...@gmail.com>.
I am using the kafka-clients through the fs2-kafka wrapper. Thou the log
message I posted and copied again here

Notifying assignor about the new Assignment(partitions=[
platform-data.appquery-platform.aoav3.backfill-28,
platform-data.appquery-platform.aoav3.backfill-31,
platform-data.appquery-platform.aoav3.backfill-34,
platform-data.appquery-platform.aoav3.backfill-37,
platform-data.appquery-platform.aoav3.backfill-40,
platform-data.appquery-platform.aoav3.backfill-43,
platform-data.appquery-platform.aoav3.backfill-46,
platform-data.appquery-platform.aoav3.backfill-49])","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}

is not generated by the fs2-kafka wrapper. Based on the kafka-client code;
these are the log messages generated during the initial partition
assignment when my application connects to the Kafka brokers. If this list
contained the full list of partitions listed in the kafka-consumer-groups
output but the lag was increasing on 2 partitions, I would immediately
suspect the fs2-kafka wrapper as the issue. The fact that the notification
messages from the kafka-clients library to the fs2-kafka library are
missing two partitions makes me suspect the issue is in the kafka-clients
library. In this occurrence this happened on 2 of the 5 consumer instances.
The version of the kafka-clients library used by the version of the
fs2-kafka library for this test is *2.8.1*. I'm currently running another
test with the latest fs2-kafka library which is consuming the *3.1.0*
version of the kafka-clients library. Initial partition assignment was
successful. On Monday I'll do a large number of scale-up/scale-down tests
to force rebalancing of partitions to see if I can replicate the issue
using the latest version.

On Sat, Mar 19, 2022 at 2:06 AM Liam Clarke-Hutchinson <lc...@redhat.com>
wrote:

> So to clarify, you're using kafka-clients directly? Or via fx2-kafka? If
> it's kafka-clients directly, what version please?
>
> On Sat, 19 Mar 2022 at 19:59, Richard Ney <ka...@gmail.com> wrote:
>
> > Hi Liam,
> >
> > Sorry for the mis-identification in the last email. The fun of answering
> an
> > email on a phone instead of a desktop. I confirmed the upper log
> messages I
> > included in the message come from this location in the `kafka-clients`
> > library
> >
> >
> >
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L422
> >
> > And it's only including 8 of the 10 partitions that were assigned to that
> > consumer instance.
> >
> > -Richard
> >
> > On Fri, Mar 18, 2022 at 11:15 PM Richard Ney <ka...@gmail.com>
> > wrote:
> >
> > > Hi Ngā mihi,
> > >
> > > I believe the log entry I included was from the underlying
> kafka-clients
> > > library given that the logger identified is
> > > “org.apache.kafka.clients.consumer.internals.ConsumerCoordinator”. I’ll
> > > admit at first I thought it also might be the fs2-kafka wrapper given
> > that
> > > the 2.4.0 version is the first version that has correct support for the
> > > messaging from the ConsumerCoordinator. I am planning to do a test with
> > the
> > > 3.0.0-M5 version which is built on the updated 3.1.0 kafka-clients
> > library
> > > and will let the list know.
> > >
> > > -Richard Ney
> > >
> > > Sent from my iPhone
> > >
> > > > On Mar 18, 2022, at 10:55 PM, Liam Clarke-Hutchinson <
> > > lclarkeh@redhat.com> wrote:
> > > >
> > > > Kia ora Richard,
> > > >
> > > > I see support for the Cooperative Sticky Assignor in fs2-kafka is
> quite
> > > > new. Have you discussed this issue with the community of that client
> at
> > > > all? I ask because I see on GitHub that fs2-kafka is using
> > kafka-clients
> > > > 2.8.1 as the underlying client, and there's been a fair few bugfixes
> > > around
> > > > the cooperative sticky assignor since that version.
> > > >
> > > > Could you perhaps try overriding the kafka-clients dependency of
> > > fs2-kafka
> > > > and try a higher version, perhaps 3.1.0, and see if the issue
> remains?
> > > I'm
> > > > not sure how well that'll work, but might help narrow down the issue.
> > > >
> > > > Ngā mihi,
> > > >
> > > > Liam Clarke-Hutchinson
> > > >
> > > >> On Sat, 19 Mar 2022 at 14:24, Richard Ney <ka...@gmail.com>
> > > wrote:
> > > >>
> > > >> Thanks for the additional information Bruno. Does this look like a
> > > possible
> > > >> bug in the CooperativeStickyAssignor? I have 5 consumers reading
> from
> > a
> > > 50
> > > >> partition topic. Based on the log messages this application instance
> > is
> > > >> only getting assigned 8 partitions but when I ask the consumer group
> > for
> > > >> LAG information the consumer group thinks the correct number of 10
> > > >> partitions were assigned but as should 2 partitions aren't getting
> > read
> > > due
> > > >> to the application not knowing it has them.
> > > >>
> > > >> {"timestamp":"2022-03-19T00:54:46.025Z","message":"[Consumer
> > > >> instanceId=i-0e89c9bee06f71f68,
> > > >>
> > >
> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> > > >> groupId=app-query-platform-aoa-backfill-v7] Updating assignment
> > with\n\t
> > > >> Assigned partitions: [
> > > >> platform-data.appquery-platform.aoav3.backfill-28,
> > > >> platform-data.appquery-platform.aoav3.backfill-43,
> > > >> platform-data.appquery-platform.aoav3.backfill-31,
> > > >> platform-data.appquery-platform.aoav3.backfill-46,
> > > >> platform-data.appquery-platform.aoav3.backfill-34,
> > > >> platform-data.appquery-platform.aoav3.backfill-49,
> > > >> platform-data.appquery-platform.aoav3.backfill-40,
> > > >> platform-data.appquery-platform.aoav3.backfill-37] \n\t
> > > >> Current owned partitions:                  []\n\t
> > > >>
> > > >> Added partitions (assigned - owned):       [
> > > >> platform-data.appquery-platform.aoav3.backfill-28,
> > > >> platform-data.appquery-platform.aoav3.backfill-43,
> > > >> platform-data.appquery-platform.aoav3.backfill-31,
> > > >> platform-data.appquery-platform.aoav3.backfill-46,
> > > >> platform-data.appquery-platform.aoav3.backfill-34,
> > > >> platform-data.appquery-platform.aoav3.backfill-49,
> > > >> platform-data.appquery-platform.aoav3.backfill-40,
> > > >> platform-data.appquery-platform.aoav3.backfill-37]\n\t
> > > >>
> > > >> Revoked partitions (owned - assigned):
> > > >>
> > > >>
> > >
> >
> []\n","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> > > >>
> > > >>
> > > >> {"timestamp":"2022-03-19T00:54:46.026Z","message":"[Consumer
> > > >> instanceId=i-0e89c9bee06f71f68,
> > > >>
> > >
> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> > > >> groupId=app-query-platform-aoa-backfill-v7]
> > > >>
> > > >> Notifying assignor about the new Assignment(partitions=[
> > > >> platform-data.appquery-platform.aoav3.backfill-28,
> > > >> platform-data.appquery-platform.aoav3.backfill-31,
> > > >> platform-data.appquery-platform.aoav3.backfill-34,
> > > >> platform-data.appquery-platform.aoav3.backfill-37,
> > > >> platform-data.appquery-platform.aoav3.backfill-40,
> > > >> platform-data.appquery-platform.aoav3.backfill-43,
> > > >> platform-data.appquery-platform.aoav3.backfill-46,
> > > >>
> > > >>
> > >
> >
> platform-data.appquery-platform.aoav3.backfill-49])","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> > > >>
> > > >> {"timestamp":"2022-03-19T00:54:46.028Z","message":"[Consumer
> > > >> instanceId=i-0e89c9bee06f71f68,
> > > >>
> > >
> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> > > >> groupId=app-query-platform-aoa-backfill-v7]
> > > >>
> > > >> Adding newly assigned partitions:
> > > >> platform-data.appquery-platform.aoav3.backfill-28,
> > > >> platform-data.appquery-platform.aoav3.backfill-43,
> > > >> platform-data.appquery-platform.aoav3.backfill-31,
> > > >> platform-data.appquery-platform.aoav3.backfill-46,
> > > >> platform-data.appquery-platform.aoav3.backfill-34,
> > > >> platform-data.appquery-platform.aoav3.backfill-49,
> > > >> platform-data.appquery-platform.aoav3.backfill-40,
> > > >>
> > > >>
> > >
> >
> platform-data.appquery-platform.aoav3.backfill-37","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> > > >>
> > > >> *OUTPUT FROM `/usr/local/opt/kafka/bin/kafka-consumer-groups`*
> > > >>
> > > >> GROUP                              TOPIC
> > > >>       PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> > > >> CONSUMER-ID                                              HOST
> > > >> CLIENT-ID
> > > >> app-query-platform-aoa-backfill-v7
> > > >> platform-data.appquery-platform.aoav3.backfill 40         8369679
> > > >> 8369696         17
> > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > 10.123.16.69
> > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > >> app-query-platform-aoa-backfill-v7
> > > >> platform-data.appquery-platform.aoav3.backfill 37         8369643
> > > >> 8369658         15
> > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > 10.123.16.69
> > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > >> app-query-platform-aoa-backfill-v7
> > > >> platform-data.appquery-platform.aoav3.backfill 46         8368044
> > > >> 8368055         11
> > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > 10.123.16.69
> > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > >> app-query-platform-aoa-backfill-v7
> > > >> platform-data.appquery-platform.aoav3.backfill 34         8379346
> > > >> 8379358         12
> > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > 10.123.16.69
> > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > >> app-query-platform-aoa-backfill-v7
> > > >> platform-data.appquery-platform.aoav3.backfill 28         8374244
> > > >> 8374247         3
> > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > 10.123.16.69
> > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > >> app-query-platform-aoa-backfill-v7
> > > >> platform-data.appquery-platform.aoav3.backfill 49         8364656
> > > >> 8364665         9
> > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > 10.123.16.69
> > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > >> app-query-platform-aoa-backfill-v7
> > > >> platform-data.appquery-platform.aoav3.backfill 43         8369980
> > > >> 8369988         8
> > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > 10.123.16.69
> > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > >> app-query-platform-aoa-backfill-v7
> > > >> platform-data.appquery-platform.aoav3.backfill 25         8369261
> > > >> 8370063         802
> > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > 10.123.16.69
> > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > >> app-query-platform-aoa-backfill-v7
> > > >> platform-data.appquery-platform.aoav3.backfill 31         8368087
> > > >> 8368097         10
> > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > 10.123.16.69
> > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > >> app-query-platform-aoa-backfill-v7
> > > >> platform-data.appquery-platform.aoav3.backfill 22         8370475
> > > >> 8371319         844
> > > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> > 10.123.16.69
> > > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > > >>
> > > >>> On Thu, Mar 17, 2022 at 2:29 AM Bruno Cadonna <ca...@apache.org>
> > > wrote:
> > > >>>
> > > >>> Hi Richard,
> > > >>>
> > > >>> The group.instance.id config is orthogonal to the partition
> > assignment
> > > >>> strategy. The group.instance.id is used if you want to have static
> > > >>> membership which is not related to the partition assignment
> strategy.
> > > >>>
> > > >>> If you think you found a bug, could you please open a JIRA ticket
> > with
> > > >>> steps to reproduce the bug.
> > > >>>
> > > >>> Best,
> > > >>> Bruno
> > > >>>
> > > >>> On 16.03.22 10:01, Luke Chen wrote:
> > > >>>> Hi Richard,
> > > >>>>
> > > >>>> Right, you are not missing any settings beyond the partition
> > > assignment
> > > >>>> strategy and the group instance id.
> > > >>>> You might need to know from the log that why the rebalance
> triggered
> > > to
> > > >>> do
> > > >>>> troubleshooting.
> > > >>>>
> > > >>>> Thank you.
> > > >>>> Luke
> > > >>>>
> > > >>>> On Wed, Mar 16, 2022 at 3:02 PM Richard Ney <
> kamisama.ney@gmail.com
> > >
> > > >>> wrote:
> > > >>>>
> > > >>>>> Hi Luke,
> > > >>>>>
> > > >>>>> I did end up with a situation where I had two instances
> connecting
> > to
> > > >>> the
> > > >>>>> same consumer group and they ended up in a rebalance trade-off.
> All
> > > >>>>> partitions kept going back and forth between the two microservice
> > > >>>>> instances. That was a test case where I'd removed the Group
> > Instance
> > > >> Id
> > > >>>>> setting to see what would happen. I stabilized that one by
> reducing
> > > it
> > > >>> to a
> > > >>>>> single consumer after 20+ rebalances.
> > > >>>>>
> > > >>>>> The other issue I'm seeing may be a bug in the Functional Scala
> > > >>> `fs2-kafka`
> > > >>>>> wrapper where I see the partitions cleanly assigned but one or
> more
> > > >>>>> instances isn't ingesting. I found out that they recently added
> > > >> support
> > > >>> for
> > > >>>>> the cooperative sticky assignor for the stream recreation since
> > they
> > > >>> were
> > > >>>>> assuming a full revocation of the partitions.
> > > >>>>>
> > > >>>>> So I basically wanted to make sure I wasn't missing any settings
> > > >> beyond
> > > >>> the
> > > >>>>> partition assignment strategy and the group instance id.
> > > >>>>>
> > > >>>>> -Richard
> > > >>>>>
> > > >>>>> -Richard
> > > >>>>>
> > > >>>>> On Tue, Mar 15, 2022 at 11:27 PM Luke Chen <sh...@gmail.com>
> > > wrote:
> > > >>>>>
> > > >>>>>> Hi Richard,
> > > >>>>>>
> > > >>>>>> To use `CooperativeStickyAssignor`, no other special
> configuration
> > > is
> > > >>>>>> required.
> > > >>>>>>
> > > >>>>>> I'm not sure what does `make the rebalance happen cleanly` mean.
> > > >>>>>> Did you find any problem during group rebalance?
> > > >>>>>>
> > > >>>>>> Thank you.
> > > >>>>>> Luke
> > > >>>>>>
> > > >>>>>> On Wed, Mar 16, 2022 at 1:00 PM Richard Ney <
> > > richard.ney@lookout.com
> > > >>>>>> .invalid>
> > > >>>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Trying to find a good sample of what consumer settings besides
> > > >> setting
> > > >>>>>>>
> > > >>>>>>> ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG to
> > > >>>>>>> org.apache.kafka.clients.consumer.CooperativeStickyAssignor
> > > >>>>>>>
> > > >>>>>>> is needed to make the rebalance happen cleanly. Unable to find
> > and
> > > >>>>> decent
> > > >>>>>>> documentation or code samples. I have set the Group Instance Id
> > to
> > > >> the
> > > >>>>>> EC2
> > > >>>>>>> instance id based on one blog write up I found.
> > > >>>>>>>
> > > >>>>>>> Any help would be appreciated
> > > >>>>>>>
> > > >>>>>>> -Richard
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> >
>

Re: Setting up the CooperativeStickyAssignor in Java

Posted by Liam Clarke-Hutchinson <lc...@redhat.com>.
So to clarify, you're using kafka-clients directly? Or via fx2-kafka? If
it's kafka-clients directly, what version please?

On Sat, 19 Mar 2022 at 19:59, Richard Ney <ka...@gmail.com> wrote:

> Hi Liam,
>
> Sorry for the mis-identification in the last email. The fun of answering an
> email on a phone instead of a desktop. I confirmed the upper log messages I
> included in the message come from this location in the `kafka-clients`
> library
>
>
> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L422
>
> And it's only including 8 of the 10 partitions that were assigned to that
> consumer instance.
>
> -Richard
>
> On Fri, Mar 18, 2022 at 11:15 PM Richard Ney <ka...@gmail.com>
> wrote:
>
> > Hi Ngā mihi,
> >
> > I believe the log entry I included was from the underlying kafka-clients
> > library given that the logger identified is
> > “org.apache.kafka.clients.consumer.internals.ConsumerCoordinator”. I’ll
> > admit at first I thought it also might be the fs2-kafka wrapper given
> that
> > the 2.4.0 version is the first version that has correct support for the
> > messaging from the ConsumerCoordinator. I am planning to do a test with
> the
> > 3.0.0-M5 version which is built on the updated 3.1.0 kafka-clients
> library
> > and will let the list know.
> >
> > -Richard Ney
> >
> > Sent from my iPhone
> >
> > > On Mar 18, 2022, at 10:55 PM, Liam Clarke-Hutchinson <
> > lclarkeh@redhat.com> wrote:
> > >
> > > Kia ora Richard,
> > >
> > > I see support for the Cooperative Sticky Assignor in fs2-kafka is quite
> > > new. Have you discussed this issue with the community of that client at
> > > all? I ask because I see on GitHub that fs2-kafka is using
> kafka-clients
> > > 2.8.1 as the underlying client, and there's been a fair few bugfixes
> > around
> > > the cooperative sticky assignor since that version.
> > >
> > > Could you perhaps try overriding the kafka-clients dependency of
> > fs2-kafka
> > > and try a higher version, perhaps 3.1.0, and see if the issue remains?
> > I'm
> > > not sure how well that'll work, but might help narrow down the issue.
> > >
> > > Ngā mihi,
> > >
> > > Liam Clarke-Hutchinson
> > >
> > >> On Sat, 19 Mar 2022 at 14:24, Richard Ney <ka...@gmail.com>
> > wrote:
> > >>
> > >> Thanks for the additional information Bruno. Does this look like a
> > possible
> > >> bug in the CooperativeStickyAssignor? I have 5 consumers reading from
> a
> > 50
> > >> partition topic. Based on the log messages this application instance
> is
> > >> only getting assigned 8 partitions but when I ask the consumer group
> for
> > >> LAG information the consumer group thinks the correct number of 10
> > >> partitions were assigned but as should 2 partitions aren't getting
> read
> > due
> > >> to the application not knowing it has them.
> > >>
> > >> {"timestamp":"2022-03-19T00:54:46.025Z","message":"[Consumer
> > >> instanceId=i-0e89c9bee06f71f68,
> > >>
> > clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> > >> groupId=app-query-platform-aoa-backfill-v7] Updating assignment
> with\n\t
> > >> Assigned partitions: [
> > >> platform-data.appquery-platform.aoav3.backfill-28,
> > >> platform-data.appquery-platform.aoav3.backfill-43,
> > >> platform-data.appquery-platform.aoav3.backfill-31,
> > >> platform-data.appquery-platform.aoav3.backfill-46,
> > >> platform-data.appquery-platform.aoav3.backfill-34,
> > >> platform-data.appquery-platform.aoav3.backfill-49,
> > >> platform-data.appquery-platform.aoav3.backfill-40,
> > >> platform-data.appquery-platform.aoav3.backfill-37] \n\t
> > >> Current owned partitions:                  []\n\t
> > >>
> > >> Added partitions (assigned - owned):       [
> > >> platform-data.appquery-platform.aoav3.backfill-28,
> > >> platform-data.appquery-platform.aoav3.backfill-43,
> > >> platform-data.appquery-platform.aoav3.backfill-31,
> > >> platform-data.appquery-platform.aoav3.backfill-46,
> > >> platform-data.appquery-platform.aoav3.backfill-34,
> > >> platform-data.appquery-platform.aoav3.backfill-49,
> > >> platform-data.appquery-platform.aoav3.backfill-40,
> > >> platform-data.appquery-platform.aoav3.backfill-37]\n\t
> > >>
> > >> Revoked partitions (owned - assigned):
> > >>
> > >>
> >
> []\n","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> > >>
> > >>
> > >> {"timestamp":"2022-03-19T00:54:46.026Z","message":"[Consumer
> > >> instanceId=i-0e89c9bee06f71f68,
> > >>
> > clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> > >> groupId=app-query-platform-aoa-backfill-v7]
> > >>
> > >> Notifying assignor about the new Assignment(partitions=[
> > >> platform-data.appquery-platform.aoav3.backfill-28,
> > >> platform-data.appquery-platform.aoav3.backfill-31,
> > >> platform-data.appquery-platform.aoav3.backfill-34,
> > >> platform-data.appquery-platform.aoav3.backfill-37,
> > >> platform-data.appquery-platform.aoav3.backfill-40,
> > >> platform-data.appquery-platform.aoav3.backfill-43,
> > >> platform-data.appquery-platform.aoav3.backfill-46,
> > >>
> > >>
> >
> platform-data.appquery-platform.aoav3.backfill-49])","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> > >>
> > >> {"timestamp":"2022-03-19T00:54:46.028Z","message":"[Consumer
> > >> instanceId=i-0e89c9bee06f71f68,
> > >>
> > clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> > >> groupId=app-query-platform-aoa-backfill-v7]
> > >>
> > >> Adding newly assigned partitions:
> > >> platform-data.appquery-platform.aoav3.backfill-28,
> > >> platform-data.appquery-platform.aoav3.backfill-43,
> > >> platform-data.appquery-platform.aoav3.backfill-31,
> > >> platform-data.appquery-platform.aoav3.backfill-46,
> > >> platform-data.appquery-platform.aoav3.backfill-34,
> > >> platform-data.appquery-platform.aoav3.backfill-49,
> > >> platform-data.appquery-platform.aoav3.backfill-40,
> > >>
> > >>
> >
> platform-data.appquery-platform.aoav3.backfill-37","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> > >>
> > >> *OUTPUT FROM `/usr/local/opt/kafka/bin/kafka-consumer-groups`*
> > >>
> > >> GROUP                              TOPIC
> > >>       PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> > >> CONSUMER-ID                                              HOST
> > >> CLIENT-ID
> > >> app-query-platform-aoa-backfill-v7
> > >> platform-data.appquery-platform.aoav3.backfill 40         8369679
> > >> 8369696         17
> > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> 10.123.16.69
> > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > >> app-query-platform-aoa-backfill-v7
> > >> platform-data.appquery-platform.aoav3.backfill 37         8369643
> > >> 8369658         15
> > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> 10.123.16.69
> > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > >> app-query-platform-aoa-backfill-v7
> > >> platform-data.appquery-platform.aoav3.backfill 46         8368044
> > >> 8368055         11
> > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> 10.123.16.69
> > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > >> app-query-platform-aoa-backfill-v7
> > >> platform-data.appquery-platform.aoav3.backfill 34         8379346
> > >> 8379358         12
> > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> 10.123.16.69
> > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > >> app-query-platform-aoa-backfill-v7
> > >> platform-data.appquery-platform.aoav3.backfill 28         8374244
> > >> 8374247         3
> > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> 10.123.16.69
> > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > >> app-query-platform-aoa-backfill-v7
> > >> platform-data.appquery-platform.aoav3.backfill 49         8364656
> > >> 8364665         9
> > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> 10.123.16.69
> > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > >> app-query-platform-aoa-backfill-v7
> > >> platform-data.appquery-platform.aoav3.backfill 43         8369980
> > >> 8369988         8
> > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> 10.123.16.69
> > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > >> app-query-platform-aoa-backfill-v7
> > >> platform-data.appquery-platform.aoav3.backfill 25         8369261
> > >> 8370063         802
> > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> 10.123.16.69
> > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > >> app-query-platform-aoa-backfill-v7
> > >> platform-data.appquery-platform.aoav3.backfill 31         8368087
> > >> 8368097         10
> > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> 10.123.16.69
> > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > >> app-query-platform-aoa-backfill-v7
> > >> platform-data.appquery-platform.aoav3.backfill 22         8370475
> > >> 8371319         844
> > >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /
> 10.123.16.69
> > >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> > >>
> > >>> On Thu, Mar 17, 2022 at 2:29 AM Bruno Cadonna <ca...@apache.org>
> > wrote:
> > >>>
> > >>> Hi Richard,
> > >>>
> > >>> The group.instance.id config is orthogonal to the partition
> assignment
> > >>> strategy. The group.instance.id is used if you want to have static
> > >>> membership which is not related to the partition assignment strategy.
> > >>>
> > >>> If you think you found a bug, could you please open a JIRA ticket
> with
> > >>> steps to reproduce the bug.
> > >>>
> > >>> Best,
> > >>> Bruno
> > >>>
> > >>> On 16.03.22 10:01, Luke Chen wrote:
> > >>>> Hi Richard,
> > >>>>
> > >>>> Right, you are not missing any settings beyond the partition
> > assignment
> > >>>> strategy and the group instance id.
> > >>>> You might need to know from the log that why the rebalance triggered
> > to
> > >>> do
> > >>>> troubleshooting.
> > >>>>
> > >>>> Thank you.
> > >>>> Luke
> > >>>>
> > >>>> On Wed, Mar 16, 2022 at 3:02 PM Richard Ney <kamisama.ney@gmail.com
> >
> > >>> wrote:
> > >>>>
> > >>>>> Hi Luke,
> > >>>>>
> > >>>>> I did end up with a situation where I had two instances connecting
> to
> > >>> the
> > >>>>> same consumer group and they ended up in a rebalance trade-off. All
> > >>>>> partitions kept going back and forth between the two microservice
> > >>>>> instances. That was a test case where I'd removed the Group
> Instance
> > >> Id
> > >>>>> setting to see what would happen. I stabilized that one by reducing
> > it
> > >>> to a
> > >>>>> single consumer after 20+ rebalances.
> > >>>>>
> > >>>>> The other issue I'm seeing may be a bug in the Functional Scala
> > >>> `fs2-kafka`
> > >>>>> wrapper where I see the partitions cleanly assigned but one or more
> > >>>>> instances isn't ingesting. I found out that they recently added
> > >> support
> > >>> for
> > >>>>> the cooperative sticky assignor for the stream recreation since
> they
> > >>> were
> > >>>>> assuming a full revocation of the partitions.
> > >>>>>
> > >>>>> So I basically wanted to make sure I wasn't missing any settings
> > >> beyond
> > >>> the
> > >>>>> partition assignment strategy and the group instance id.
> > >>>>>
> > >>>>> -Richard
> > >>>>>
> > >>>>> -Richard
> > >>>>>
> > >>>>> On Tue, Mar 15, 2022 at 11:27 PM Luke Chen <sh...@gmail.com>
> > wrote:
> > >>>>>
> > >>>>>> Hi Richard,
> > >>>>>>
> > >>>>>> To use `CooperativeStickyAssignor`, no other special configuration
> > is
> > >>>>>> required.
> > >>>>>>
> > >>>>>> I'm not sure what does `make the rebalance happen cleanly` mean.
> > >>>>>> Did you find any problem during group rebalance?
> > >>>>>>
> > >>>>>> Thank you.
> > >>>>>> Luke
> > >>>>>>
> > >>>>>> On Wed, Mar 16, 2022 at 1:00 PM Richard Ney <
> > richard.ney@lookout.com
> > >>>>>> .invalid>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Trying to find a good sample of what consumer settings besides
> > >> setting
> > >>>>>>>
> > >>>>>>> ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG to
> > >>>>>>> org.apache.kafka.clients.consumer.CooperativeStickyAssignor
> > >>>>>>>
> > >>>>>>> is needed to make the rebalance happen cleanly. Unable to find
> and
> > >>>>> decent
> > >>>>>>> documentation or code samples. I have set the Group Instance Id
> to
> > >> the
> > >>>>>> EC2
> > >>>>>>> instance id based on one blog write up I found.
> > >>>>>>>
> > >>>>>>> Any help would be appreciated
> > >>>>>>>
> > >>>>>>> -Richard
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>
> > >>
> >
>

Re: Setting up the CooperativeStickyAssignor in Java

Posted by Richard Ney <ka...@gmail.com>.
Hi Liam,

Sorry for the mis-identification in the last email. The fun of answering an
email on a phone instead of a desktop. I confirmed the upper log messages I
included in the message come from this location in the `kafka-clients`
library

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L422

And it's only including 8 of the 10 partitions that were assigned to that
consumer instance.

-Richard

On Fri, Mar 18, 2022 at 11:15 PM Richard Ney <ka...@gmail.com> wrote:

> Hi Ngā mihi,
>
> I believe the log entry I included was from the underlying kafka-clients
> library given that the logger identified is
> “org.apache.kafka.clients.consumer.internals.ConsumerCoordinator”. I’ll
> admit at first I thought it also might be the fs2-kafka wrapper given that
> the 2.4.0 version is the first version that has correct support for the
> messaging from the ConsumerCoordinator. I am planning to do a test with the
> 3.0.0-M5 version which is built on the updated 3.1.0 kafka-clients library
> and will let the list know.
>
> -Richard Ney
>
> Sent from my iPhone
>
> > On Mar 18, 2022, at 10:55 PM, Liam Clarke-Hutchinson <
> lclarkeh@redhat.com> wrote:
> >
> > Kia ora Richard,
> >
> > I see support for the Cooperative Sticky Assignor in fs2-kafka is quite
> > new. Have you discussed this issue with the community of that client at
> > all? I ask because I see on GitHub that fs2-kafka is using kafka-clients
> > 2.8.1 as the underlying client, and there's been a fair few bugfixes
> around
> > the cooperative sticky assignor since that version.
> >
> > Could you perhaps try overriding the kafka-clients dependency of
> fs2-kafka
> > and try a higher version, perhaps 3.1.0, and see if the issue remains?
> I'm
> > not sure how well that'll work, but might help narrow down the issue.
> >
> > Ngā mihi,
> >
> > Liam Clarke-Hutchinson
> >
> >> On Sat, 19 Mar 2022 at 14:24, Richard Ney <ka...@gmail.com>
> wrote:
> >>
> >> Thanks for the additional information Bruno. Does this look like a
> possible
> >> bug in the CooperativeStickyAssignor? I have 5 consumers reading from a
> 50
> >> partition topic. Based on the log messages this application instance is
> >> only getting assigned 8 partitions but when I ask the consumer group for
> >> LAG information the consumer group thinks the correct number of 10
> >> partitions were assigned but as should 2 partitions aren't getting read
> due
> >> to the application not knowing it has them.
> >>
> >> {"timestamp":"2022-03-19T00:54:46.025Z","message":"[Consumer
> >> instanceId=i-0e89c9bee06f71f68,
> >>
> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> >> groupId=app-query-platform-aoa-backfill-v7] Updating assignment with\n\t
> >> Assigned partitions: [
> >> platform-data.appquery-platform.aoav3.backfill-28,
> >> platform-data.appquery-platform.aoav3.backfill-43,
> >> platform-data.appquery-platform.aoav3.backfill-31,
> >> platform-data.appquery-platform.aoav3.backfill-46,
> >> platform-data.appquery-platform.aoav3.backfill-34,
> >> platform-data.appquery-platform.aoav3.backfill-49,
> >> platform-data.appquery-platform.aoav3.backfill-40,
> >> platform-data.appquery-platform.aoav3.backfill-37] \n\t
> >> Current owned partitions:                  []\n\t
> >>
> >> Added partitions (assigned - owned):       [
> >> platform-data.appquery-platform.aoav3.backfill-28,
> >> platform-data.appquery-platform.aoav3.backfill-43,
> >> platform-data.appquery-platform.aoav3.backfill-31,
> >> platform-data.appquery-platform.aoav3.backfill-46,
> >> platform-data.appquery-platform.aoav3.backfill-34,
> >> platform-data.appquery-platform.aoav3.backfill-49,
> >> platform-data.appquery-platform.aoav3.backfill-40,
> >> platform-data.appquery-platform.aoav3.backfill-37]\n\t
> >>
> >> Revoked partitions (owned - assigned):
> >>
> >>
> []\n","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> >>
> >>
> >> {"timestamp":"2022-03-19T00:54:46.026Z","message":"[Consumer
> >> instanceId=i-0e89c9bee06f71f68,
> >>
> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> >> groupId=app-query-platform-aoa-backfill-v7]
> >>
> >> Notifying assignor about the new Assignment(partitions=[
> >> platform-data.appquery-platform.aoav3.backfill-28,
> >> platform-data.appquery-platform.aoav3.backfill-31,
> >> platform-data.appquery-platform.aoav3.backfill-34,
> >> platform-data.appquery-platform.aoav3.backfill-37,
> >> platform-data.appquery-platform.aoav3.backfill-40,
> >> platform-data.appquery-platform.aoav3.backfill-43,
> >> platform-data.appquery-platform.aoav3.backfill-46,
> >>
> >>
> platform-data.appquery-platform.aoav3.backfill-49])","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> >>
> >> {"timestamp":"2022-03-19T00:54:46.028Z","message":"[Consumer
> >> instanceId=i-0e89c9bee06f71f68,
> >>
> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> >> groupId=app-query-platform-aoa-backfill-v7]
> >>
> >> Adding newly assigned partitions:
> >> platform-data.appquery-platform.aoav3.backfill-28,
> >> platform-data.appquery-platform.aoav3.backfill-43,
> >> platform-data.appquery-platform.aoav3.backfill-31,
> >> platform-data.appquery-platform.aoav3.backfill-46,
> >> platform-data.appquery-platform.aoav3.backfill-34,
> >> platform-data.appquery-platform.aoav3.backfill-49,
> >> platform-data.appquery-platform.aoav3.backfill-40,
> >>
> >>
> platform-data.appquery-platform.aoav3.backfill-37","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
> >>
> >> *OUTPUT FROM `/usr/local/opt/kafka/bin/kafka-consumer-groups`*
> >>
> >> GROUP                              TOPIC
> >>       PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> >> CONSUMER-ID                                              HOST
> >> CLIENT-ID
> >> app-query-platform-aoa-backfill-v7
> >> platform-data.appquery-platform.aoav3.backfill 40         8369679
> >> 8369696         17
> >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
> >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> >> app-query-platform-aoa-backfill-v7
> >> platform-data.appquery-platform.aoav3.backfill 37         8369643
> >> 8369658         15
> >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
> >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> >> app-query-platform-aoa-backfill-v7
> >> platform-data.appquery-platform.aoav3.backfill 46         8368044
> >> 8368055         11
> >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
> >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> >> app-query-platform-aoa-backfill-v7
> >> platform-data.appquery-platform.aoav3.backfill 34         8379346
> >> 8379358         12
> >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
> >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> >> app-query-platform-aoa-backfill-v7
> >> platform-data.appquery-platform.aoav3.backfill 28         8374244
> >> 8374247         3
> >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
> >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> >> app-query-platform-aoa-backfill-v7
> >> platform-data.appquery-platform.aoav3.backfill 49         8364656
> >> 8364665         9
> >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
> >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> >> app-query-platform-aoa-backfill-v7
> >> platform-data.appquery-platform.aoav3.backfill 43         8369980
> >> 8369988         8
> >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
> >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> >> app-query-platform-aoa-backfill-v7
> >> platform-data.appquery-platform.aoav3.backfill 25         8369261
> >> 8370063         802
> >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
> >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> >> app-query-platform-aoa-backfill-v7
> >> platform-data.appquery-platform.aoav3.backfill 31         8368087
> >> 8368097         10
> >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
> >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> >> app-query-platform-aoa-backfill-v7
> >> platform-data.appquery-platform.aoav3.backfill 22         8370475
> >> 8371319         844
> >> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
> >> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> >>
> >>> On Thu, Mar 17, 2022 at 2:29 AM Bruno Cadonna <ca...@apache.org>
> wrote:
> >>>
> >>> Hi Richard,
> >>>
> >>> The group.instance.id config is orthogonal to the partition assignment
> >>> strategy. The group.instance.id is used if you want to have static
> >>> membership which is not related to the partition assignment strategy.
> >>>
> >>> If you think you found a bug, could you please open a JIRA ticket with
> >>> steps to reproduce the bug.
> >>>
> >>> Best,
> >>> Bruno
> >>>
> >>> On 16.03.22 10:01, Luke Chen wrote:
> >>>> Hi Richard,
> >>>>
> >>>> Right, you are not missing any settings beyond the partition
> assignment
> >>>> strategy and the group instance id.
> >>>> You might need to know from the log that why the rebalance triggered
> to
> >>> do
> >>>> troubleshooting.
> >>>>
> >>>> Thank you.
> >>>> Luke
> >>>>
> >>>> On Wed, Mar 16, 2022 at 3:02 PM Richard Ney <ka...@gmail.com>
> >>> wrote:
> >>>>
> >>>>> Hi Luke,
> >>>>>
> >>>>> I did end up with a situation where I had two instances connecting to
> >>> the
> >>>>> same consumer group and they ended up in a rebalance trade-off. All
> >>>>> partitions kept going back and forth between the two microservice
> >>>>> instances. That was a test case where I'd removed the Group Instance
> >> Id
> >>>>> setting to see what would happen. I stabilized that one by reducing
> it
> >>> to a
> >>>>> single consumer after 20+ rebalances.
> >>>>>
> >>>>> The other issue I'm seeing may be a bug in the Functional Scala
> >>> `fs2-kafka`
> >>>>> wrapper where I see the partitions cleanly assigned but one or more
> >>>>> instances isn't ingesting. I found out that they recently added
> >> support
> >>> for
> >>>>> the cooperative sticky assignor for the stream recreation since they
> >>> were
> >>>>> assuming a full revocation of the partitions.
> >>>>>
> >>>>> So I basically wanted to make sure I wasn't missing any settings
> >> beyond
> >>> the
> >>>>> partition assignment strategy and the group instance id.
> >>>>>
> >>>>> -Richard
> >>>>>
> >>>>> -Richard
> >>>>>
> >>>>> On Tue, Mar 15, 2022 at 11:27 PM Luke Chen <sh...@gmail.com>
> wrote:
> >>>>>
> >>>>>> Hi Richard,
> >>>>>>
> >>>>>> To use `CooperativeStickyAssignor`, no other special configuration
> is
> >>>>>> required.
> >>>>>>
> >>>>>> I'm not sure what does `make the rebalance happen cleanly` mean.
> >>>>>> Did you find any problem during group rebalance?
> >>>>>>
> >>>>>> Thank you.
> >>>>>> Luke
> >>>>>>
> >>>>>> On Wed, Mar 16, 2022 at 1:00 PM Richard Ney <
> richard.ney@lookout.com
> >>>>>> .invalid>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Trying to find a good sample of what consumer settings besides
> >> setting
> >>>>>>>
> >>>>>>> ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG to
> >>>>>>> org.apache.kafka.clients.consumer.CooperativeStickyAssignor
> >>>>>>>
> >>>>>>> is needed to make the rebalance happen cleanly. Unable to find and
> >>>>> decent
> >>>>>>> documentation or code samples. I have set the Group Instance Id to
> >> the
> >>>>>> EC2
> >>>>>>> instance id based on one blog write up I found.
> >>>>>>>
> >>>>>>> Any help would be appreciated
> >>>>>>>
> >>>>>>> -Richard
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
>

Re: Setting up the CooperativeStickyAssignor in Java

Posted by Richard Ney <ka...@gmail.com>.
Hi Ngā mihi,

I believe the log entry I included was from the underlying kafka-clients library given that the logger identified is “org.apache.kafka.clients.consumer.internals.ConsumerCoordinator”. I’ll admit at first I thought it also might be the fs2-kafka wrapper given that the 2.4.0 version is the first version that has correct support for the messaging from the ConsumerCoordinator. I am planning to do a test with the 3.0.0-M5 version which is built on the updated 3.1.0 kafka-clients library and will let the list know.

-Richard Ney

Sent from my iPhone

> On Mar 18, 2022, at 10:55 PM, Liam Clarke-Hutchinson <lc...@redhat.com> wrote:
> 
> Kia ora Richard,
> 
> I see support for the Cooperative Sticky Assignor in fs2-kafka is quite
> new. Have you discussed this issue with the community of that client at
> all? I ask because I see on GitHub that fs2-kafka is using kafka-clients
> 2.8.1 as the underlying client, and there's been a fair few bugfixes around
> the cooperative sticky assignor since that version.
> 
> Could you perhaps try overriding the kafka-clients dependency of fs2-kafka
> and try a higher version, perhaps 3.1.0, and see if the issue remains? I'm
> not sure how well that'll work, but might help narrow down the issue.
> 
> Ngā mihi,
> 
> Liam Clarke-Hutchinson
> 
>> On Sat, 19 Mar 2022 at 14:24, Richard Ney <ka...@gmail.com> wrote:
>> 
>> Thanks for the additional information Bruno. Does this look like a possible
>> bug in the CooperativeStickyAssignor? I have 5 consumers reading from a 50
>> partition topic. Based on the log messages this application instance is
>> only getting assigned 8 partitions but when I ask the consumer group for
>> LAG information the consumer group thinks the correct number of 10
>> partitions were assigned but as should 2 partitions aren't getting read due
>> to the application not knowing it has them.
>> 
>> {"timestamp":"2022-03-19T00:54:46.025Z","message":"[Consumer
>> instanceId=i-0e89c9bee06f71f68,
>> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
>> groupId=app-query-platform-aoa-backfill-v7] Updating assignment with\n\t
>> Assigned partitions: [
>> platform-data.appquery-platform.aoav3.backfill-28,
>> platform-data.appquery-platform.aoav3.backfill-43,
>> platform-data.appquery-platform.aoav3.backfill-31,
>> platform-data.appquery-platform.aoav3.backfill-46,
>> platform-data.appquery-platform.aoav3.backfill-34,
>> platform-data.appquery-platform.aoav3.backfill-49,
>> platform-data.appquery-platform.aoav3.backfill-40,
>> platform-data.appquery-platform.aoav3.backfill-37] \n\t
>> Current owned partitions:                  []\n\t
>> 
>> Added partitions (assigned - owned):       [
>> platform-data.appquery-platform.aoav3.backfill-28,
>> platform-data.appquery-platform.aoav3.backfill-43,
>> platform-data.appquery-platform.aoav3.backfill-31,
>> platform-data.appquery-platform.aoav3.backfill-46,
>> platform-data.appquery-platform.aoav3.backfill-34,
>> platform-data.appquery-platform.aoav3.backfill-49,
>> platform-data.appquery-platform.aoav3.backfill-40,
>> platform-data.appquery-platform.aoav3.backfill-37]\n\t
>> 
>> Revoked partitions (owned - assigned):
>> 
>> []\n","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
>> 
>> 
>> {"timestamp":"2022-03-19T00:54:46.026Z","message":"[Consumer
>> instanceId=i-0e89c9bee06f71f68,
>> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
>> groupId=app-query-platform-aoa-backfill-v7]
>> 
>> Notifying assignor about the new Assignment(partitions=[
>> platform-data.appquery-platform.aoav3.backfill-28,
>> platform-data.appquery-platform.aoav3.backfill-31,
>> platform-data.appquery-platform.aoav3.backfill-34,
>> platform-data.appquery-platform.aoav3.backfill-37,
>> platform-data.appquery-platform.aoav3.backfill-40,
>> platform-data.appquery-platform.aoav3.backfill-43,
>> platform-data.appquery-platform.aoav3.backfill-46,
>> 
>> platform-data.appquery-platform.aoav3.backfill-49])","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
>> 
>> {"timestamp":"2022-03-19T00:54:46.028Z","message":"[Consumer
>> instanceId=i-0e89c9bee06f71f68,
>> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
>> groupId=app-query-platform-aoa-backfill-v7]
>> 
>> Adding newly assigned partitions:
>> platform-data.appquery-platform.aoav3.backfill-28,
>> platform-data.appquery-platform.aoav3.backfill-43,
>> platform-data.appquery-platform.aoav3.backfill-31,
>> platform-data.appquery-platform.aoav3.backfill-46,
>> platform-data.appquery-platform.aoav3.backfill-34,
>> platform-data.appquery-platform.aoav3.backfill-49,
>> platform-data.appquery-platform.aoav3.backfill-40,
>> 
>> platform-data.appquery-platform.aoav3.backfill-37","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
>> 
>> *OUTPUT FROM `/usr/local/opt/kafka/bin/kafka-consumer-groups`*
>> 
>> GROUP                              TOPIC
>>       PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
>> CONSUMER-ID                                              HOST
>> CLIENT-ID
>> app-query-platform-aoa-backfill-v7
>> platform-data.appquery-platform.aoav3.backfill 40         8369679
>> 8369696         17
>> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
>> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
>> app-query-platform-aoa-backfill-v7
>> platform-data.appquery-platform.aoav3.backfill 37         8369643
>> 8369658         15
>> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
>> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
>> app-query-platform-aoa-backfill-v7
>> platform-data.appquery-platform.aoav3.backfill 46         8368044
>> 8368055         11
>> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
>> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
>> app-query-platform-aoa-backfill-v7
>> platform-data.appquery-platform.aoav3.backfill 34         8379346
>> 8379358         12
>> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
>> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
>> app-query-platform-aoa-backfill-v7
>> platform-data.appquery-platform.aoav3.backfill 28         8374244
>> 8374247         3
>> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
>> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
>> app-query-platform-aoa-backfill-v7
>> platform-data.appquery-platform.aoav3.backfill 49         8364656
>> 8364665         9
>> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
>> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
>> app-query-platform-aoa-backfill-v7
>> platform-data.appquery-platform.aoav3.backfill 43         8369980
>> 8369988         8
>> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
>> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
>> app-query-platform-aoa-backfill-v7
>> platform-data.appquery-platform.aoav3.backfill 25         8369261
>> 8370063         802
>> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
>> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
>> app-query-platform-aoa-backfill-v7
>> platform-data.appquery-platform.aoav3.backfill 31         8368087
>> 8368097         10
>> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
>> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
>> app-query-platform-aoa-backfill-v7
>> platform-data.appquery-platform.aoav3.backfill 22         8370475
>> 8371319         844
>> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
>> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
>> 
>>> On Thu, Mar 17, 2022 at 2:29 AM Bruno Cadonna <ca...@apache.org> wrote:
>>> 
>>> Hi Richard,
>>> 
>>> The group.instance.id config is orthogonal to the partition assignment
>>> strategy. The group.instance.id is used if you want to have static
>>> membership which is not related to the partition assignment strategy.
>>> 
>>> If you think you found a bug, could you please open a JIRA ticket with
>>> steps to reproduce the bug.
>>> 
>>> Best,
>>> Bruno
>>> 
>>> On 16.03.22 10:01, Luke Chen wrote:
>>>> Hi Richard,
>>>> 
>>>> Right, you are not missing any settings beyond the partition assignment
>>>> strategy and the group instance id.
>>>> You might need to know from the log that why the rebalance triggered to
>>> do
>>>> troubleshooting.
>>>> 
>>>> Thank you.
>>>> Luke
>>>> 
>>>> On Wed, Mar 16, 2022 at 3:02 PM Richard Ney <ka...@gmail.com>
>>> wrote:
>>>> 
>>>>> Hi Luke,
>>>>> 
>>>>> I did end up with a situation where I had two instances connecting to
>>> the
>>>>> same consumer group and they ended up in a rebalance trade-off. All
>>>>> partitions kept going back and forth between the two microservice
>>>>> instances. That was a test case where I'd removed the Group Instance
>> Id
>>>>> setting to see what would happen. I stabilized that one by reducing it
>>> to a
>>>>> single consumer after 20+ rebalances.
>>>>> 
>>>>> The other issue I'm seeing may be a bug in the Functional Scala
>>> `fs2-kafka`
>>>>> wrapper where I see the partitions cleanly assigned but one or more
>>>>> instances isn't ingesting. I found out that they recently added
>> support
>>> for
>>>>> the cooperative sticky assignor for the stream recreation since they
>>> were
>>>>> assuming a full revocation of the partitions.
>>>>> 
>>>>> So I basically wanted to make sure I wasn't missing any settings
>> beyond
>>> the
>>>>> partition assignment strategy and the group instance id.
>>>>> 
>>>>> -Richard
>>>>> 
>>>>> -Richard
>>>>> 
>>>>> On Tue, Mar 15, 2022 at 11:27 PM Luke Chen <sh...@gmail.com> wrote:
>>>>> 
>>>>>> Hi Richard,
>>>>>> 
>>>>>> To use `CooperativeStickyAssignor`, no other special configuration is
>>>>>> required.
>>>>>> 
>>>>>> I'm not sure what does `make the rebalance happen cleanly` mean.
>>>>>> Did you find any problem during group rebalance?
>>>>>> 
>>>>>> Thank you.
>>>>>> Luke
>>>>>> 
>>>>>> On Wed, Mar 16, 2022 at 1:00 PM Richard Ney <richard.ney@lookout.com
>>>>>> .invalid>
>>>>>> wrote:
>>>>>> 
>>>>>>> Trying to find a good sample of what consumer settings besides
>> setting
>>>>>>> 
>>>>>>> ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG to
>>>>>>> org.apache.kafka.clients.consumer.CooperativeStickyAssignor
>>>>>>> 
>>>>>>> is needed to make the rebalance happen cleanly. Unable to find and
>>>>> decent
>>>>>>> documentation or code samples. I have set the Group Instance Id to
>> the
>>>>>> EC2
>>>>>>> instance id based on one blog write up I found.
>>>>>>> 
>>>>>>> Any help would be appreciated
>>>>>>> 
>>>>>>> -Richard
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Re: Setting up the CooperativeStickyAssignor in Java

Posted by Liam Clarke-Hutchinson <lc...@redhat.com>.
Kia ora Richard,

I see support for the Cooperative Sticky Assignor in fs2-kafka is quite
new. Have you discussed this issue with the community of that client at
all? I ask because I see on GitHub that fs2-kafka is using kafka-clients
2.8.1 as the underlying client, and there's been a fair few bugfixes around
the cooperative sticky assignor since that version.

Could you perhaps try overriding the kafka-clients dependency of fs2-kafka
and try a higher version, perhaps 3.1.0, and see if the issue remains? I'm
not sure how well that'll work, but might help narrow down the issue.

Ngā mihi,

Liam Clarke-Hutchinson

On Sat, 19 Mar 2022 at 14:24, Richard Ney <ka...@gmail.com> wrote:

> Thanks for the additional information Bruno. Does this look like a possible
> bug in the CooperativeStickyAssignor? I have 5 consumers reading from a 50
> partition topic. Based on the log messages this application instance is
> only getting assigned 8 partitions but when I ask the consumer group for
> LAG information the consumer group thinks the correct number of 10
> partitions were assigned but as should 2 partitions aren't getting read due
> to the application not knowing it has them.
>
> {"timestamp":"2022-03-19T00:54:46.025Z","message":"[Consumer
> instanceId=i-0e89c9bee06f71f68,
> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> groupId=app-query-platform-aoa-backfill-v7] Updating assignment with\n\t
> Assigned partitions: [
>  platform-data.appquery-platform.aoav3.backfill-28,
>  platform-data.appquery-platform.aoav3.backfill-43,
>  platform-data.appquery-platform.aoav3.backfill-31,
>  platform-data.appquery-platform.aoav3.backfill-46,
>  platform-data.appquery-platform.aoav3.backfill-34,
>  platform-data.appquery-platform.aoav3.backfill-49,
>  platform-data.appquery-platform.aoav3.backfill-40,
>  platform-data.appquery-platform.aoav3.backfill-37] \n\t
> Current owned partitions:                  []\n\t
>
> Added partitions (assigned - owned):       [
> platform-data.appquery-platform.aoav3.backfill-28,
> platform-data.appquery-platform.aoav3.backfill-43,
> platform-data.appquery-platform.aoav3.backfill-31,
> platform-data.appquery-platform.aoav3.backfill-46,
> platform-data.appquery-platform.aoav3.backfill-34,
> platform-data.appquery-platform.aoav3.backfill-49,
> platform-data.appquery-platform.aoav3.backfill-40,
> platform-data.appquery-platform.aoav3.backfill-37]\n\t
>
> Revoked partitions (owned - assigned):
>
> []\n","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
>
>
> {"timestamp":"2022-03-19T00:54:46.026Z","message":"[Consumer
> instanceId=i-0e89c9bee06f71f68,
> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> groupId=app-query-platform-aoa-backfill-v7]
>
> Notifying assignor about the new Assignment(partitions=[
> platform-data.appquery-platform.aoav3.backfill-28,
> platform-data.appquery-platform.aoav3.backfill-31,
> platform-data.appquery-platform.aoav3.backfill-34,
> platform-data.appquery-platform.aoav3.backfill-37,
> platform-data.appquery-platform.aoav3.backfill-40,
> platform-data.appquery-platform.aoav3.backfill-43,
> platform-data.appquery-platform.aoav3.backfill-46,
>
> platform-data.appquery-platform.aoav3.backfill-49])","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
>
> {"timestamp":"2022-03-19T00:54:46.028Z","message":"[Consumer
> instanceId=i-0e89c9bee06f71f68,
> clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
> groupId=app-query-platform-aoa-backfill-v7]
>
> Adding newly assigned partitions:
> platform-data.appquery-platform.aoav3.backfill-28,
> platform-data.appquery-platform.aoav3.backfill-43,
> platform-data.appquery-platform.aoav3.backfill-31,
> platform-data.appquery-platform.aoav3.backfill-46,
> platform-data.appquery-platform.aoav3.backfill-34,
> platform-data.appquery-platform.aoav3.backfill-49,
> platform-data.appquery-platform.aoav3.backfill-40,
>
> platform-data.appquery-platform.aoav3.backfill-37","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}
>
> *OUTPUT FROM `/usr/local/opt/kafka/bin/kafka-consumer-groups`*
>
> GROUP                              TOPIC
>        PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> CONSUMER-ID                                              HOST
>  CLIENT-ID
> app-query-platform-aoa-backfill-v7
> platform-data.appquery-platform.aoav3.backfill 40         8369679
> 8369696         17
>  i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> app-query-platform-aoa-backfill-v7
> platform-data.appquery-platform.aoav3.backfill 37         8369643
> 8369658         15
>  i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> app-query-platform-aoa-backfill-v7
> platform-data.appquery-platform.aoav3.backfill 46         8368044
> 8368055         11
>  i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> app-query-platform-aoa-backfill-v7
> platform-data.appquery-platform.aoav3.backfill 34         8379346
> 8379358         12
>  i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> app-query-platform-aoa-backfill-v7
> platform-data.appquery-platform.aoav3.backfill 28         8374244
> 8374247         3
> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> app-query-platform-aoa-backfill-v7
> platform-data.appquery-platform.aoav3.backfill 49         8364656
> 8364665         9
> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> app-query-platform-aoa-backfill-v7
> platform-data.appquery-platform.aoav3.backfill 43         8369980
> 8369988         8
> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> app-query-platform-aoa-backfill-v7
> platform-data.appquery-platform.aoav3.backfill 25         8369261
> 8370063         802
> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> app-query-platform-aoa-backfill-v7
> platform-data.appquery-platform.aoav3.backfill 31         8368087
> 8368097         10
>  i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
> app-query-platform-aoa-backfill-v7
> platform-data.appquery-platform.aoav3.backfill 22         8370475
> 8371319         844
> i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
> consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
>
> On Thu, Mar 17, 2022 at 2:29 AM Bruno Cadonna <ca...@apache.org> wrote:
>
> > Hi Richard,
> >
> > The group.instance.id config is orthogonal to the partition assignment
> > strategy. The group.instance.id is used if you want to have static
> > membership which is not related to the partition assignment strategy.
> >
> > If you think you found a bug, could you please open a JIRA ticket with
> > steps to reproduce the bug.
> >
> > Best,
> > Bruno
> >
> > On 16.03.22 10:01, Luke Chen wrote:
> > > Hi Richard,
> > >
> > > Right, you are not missing any settings beyond the partition assignment
> > > strategy and the group instance id.
> > > You might need to know from the log that why the rebalance triggered to
> > do
> > > troubleshooting.
> > >
> > > Thank you.
> > > Luke
> > >
> > > On Wed, Mar 16, 2022 at 3:02 PM Richard Ney <ka...@gmail.com>
> > wrote:
> > >
> > >> Hi Luke,
> > >>
> > >> I did end up with a situation where I had two instances connecting to
> > the
> > >> same consumer group and they ended up in a rebalance trade-off. All
> > >> partitions kept going back and forth between the two microservice
> > >> instances. That was a test case where I'd removed the Group Instance
> Id
> > >> setting to see what would happen. I stabilized that one by reducing it
> > to a
> > >> single consumer after 20+ rebalances.
> > >>
> > >> The other issue I'm seeing may be a bug in the Functional Scala
> > `fs2-kafka`
> > >> wrapper where I see the partitions cleanly assigned but one or more
> > >> instances isn't ingesting. I found out that they recently added
> support
> > for
> > >> the cooperative sticky assignor for the stream recreation since they
> > were
> > >> assuming a full revocation of the partitions.
> > >>
> > >> So I basically wanted to make sure I wasn't missing any settings
> beyond
> > the
> > >> partition assignment strategy and the group instance id.
> > >>
> > >> -Richard
> > >>
> > >> -Richard
> > >>
> > >> On Tue, Mar 15, 2022 at 11:27 PM Luke Chen <sh...@gmail.com> wrote:
> > >>
> > >>> Hi Richard,
> > >>>
> > >>> To use `CooperativeStickyAssignor`, no other special configuration is
> > >>> required.
> > >>>
> > >>> I'm not sure what does `make the rebalance happen cleanly` mean.
> > >>> Did you find any problem during group rebalance?
> > >>>
> > >>> Thank you.
> > >>> Luke
> > >>>
> > >>> On Wed, Mar 16, 2022 at 1:00 PM Richard Ney <richard.ney@lookout.com
> > >>> .invalid>
> > >>> wrote:
> > >>>
> > >>>> Trying to find a good sample of what consumer settings besides
> setting
> > >>>>
> > >>>> ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG to
> > >>>> org.apache.kafka.clients.consumer.CooperativeStickyAssignor
> > >>>>
> > >>>> is needed to make the rebalance happen cleanly. Unable to find and
> > >> decent
> > >>>> documentation or code samples. I have set the Group Instance Id to
> the
> > >>> EC2
> > >>>> instance id based on one blog write up I found.
> > >>>>
> > >>>> Any help would be appreciated
> > >>>>
> > >>>> -Richard
> > >>>>
> > >>>
> > >>
> > >
> >
>

Re: Setting up the CooperativeStickyAssignor in Java

Posted by Richard Ney <ka...@gmail.com>.
Thanks for the additional information Bruno. Does this look like a possible
bug in the CooperativeStickyAssignor? I have 5 consumers reading from a 50
partition topic. Based on the log messages this application instance is
only getting assigned 8 partitions but when I ask the consumer group for
LAG information the consumer group thinks the correct number of 10
partitions were assigned but as should 2 partitions aren't getting read due
to the application not knowing it has them.

{"timestamp":"2022-03-19T00:54:46.025Z","message":"[Consumer
instanceId=i-0e89c9bee06f71f68,
clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
groupId=app-query-platform-aoa-backfill-v7] Updating assignment with\n\t
Assigned partitions: [
 platform-data.appquery-platform.aoav3.backfill-28,
 platform-data.appquery-platform.aoav3.backfill-43,
 platform-data.appquery-platform.aoav3.backfill-31,
 platform-data.appquery-platform.aoav3.backfill-46,
 platform-data.appquery-platform.aoav3.backfill-34,
 platform-data.appquery-platform.aoav3.backfill-49,
 platform-data.appquery-platform.aoav3.backfill-40,
 platform-data.appquery-platform.aoav3.backfill-37] \n\t
Current owned partitions:                  []\n\t

Added partitions (assigned - owned):       [
platform-data.appquery-platform.aoav3.backfill-28,
platform-data.appquery-platform.aoav3.backfill-43,
platform-data.appquery-platform.aoav3.backfill-31,
platform-data.appquery-platform.aoav3.backfill-46,
platform-data.appquery-platform.aoav3.backfill-34,
platform-data.appquery-platform.aoav3.backfill-49,
platform-data.appquery-platform.aoav3.backfill-40,
platform-data.appquery-platform.aoav3.backfill-37]\n\t

Revoked partitions (owned - assigned):
[]\n","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}


{"timestamp":"2022-03-19T00:54:46.026Z","message":"[Consumer
instanceId=i-0e89c9bee06f71f68,
clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
groupId=app-query-platform-aoa-backfill-v7]

Notifying assignor about the new Assignment(partitions=[
platform-data.appquery-platform.aoav3.backfill-28,
platform-data.appquery-platform.aoav3.backfill-31,
platform-data.appquery-platform.aoav3.backfill-34,
platform-data.appquery-platform.aoav3.backfill-37,
platform-data.appquery-platform.aoav3.backfill-40,
platform-data.appquery-platform.aoav3.backfill-43,
platform-data.appquery-platform.aoav3.backfill-46,
platform-data.appquery-platform.aoav3.backfill-49])","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}

{"timestamp":"2022-03-19T00:54:46.028Z","message":"[Consumer
instanceId=i-0e89c9bee06f71f68,
clientId=consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68,
groupId=app-query-platform-aoa-backfill-v7]

Adding newly assigned partitions:
platform-data.appquery-platform.aoav3.backfill-28,
platform-data.appquery-platform.aoav3.backfill-43,
platform-data.appquery-platform.aoav3.backfill-31,
platform-data.appquery-platform.aoav3.backfill-46,
platform-data.appquery-platform.aoav3.backfill-34,
platform-data.appquery-platform.aoav3.backfill-49,
platform-data.appquery-platform.aoav3.backfill-40,
platform-data.appquery-platform.aoav3.backfill-37","logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-95","level":"INFO"}

*OUTPUT FROM `/usr/local/opt/kafka/bin/kafka-consumer-groups`*

GROUP                              TOPIC
       PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
CONSUMER-ID                                              HOST
 CLIENT-ID
app-query-platform-aoa-backfill-v7
platform-data.appquery-platform.aoav3.backfill 40         8369679
8369696         17
 i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
app-query-platform-aoa-backfill-v7
platform-data.appquery-platform.aoav3.backfill 37         8369643
8369658         15
 i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
app-query-platform-aoa-backfill-v7
platform-data.appquery-platform.aoav3.backfill 46         8368044
8368055         11
 i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
app-query-platform-aoa-backfill-v7
platform-data.appquery-platform.aoav3.backfill 34         8379346
8379358         12
 i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
app-query-platform-aoa-backfill-v7
platform-data.appquery-platform.aoav3.backfill 28         8374244
8374247         3
i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
app-query-platform-aoa-backfill-v7
platform-data.appquery-platform.aoav3.backfill 49         8364656
8364665         9
i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
app-query-platform-aoa-backfill-v7
platform-data.appquery-platform.aoav3.backfill 43         8369980
8369988         8
i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
app-query-platform-aoa-backfill-v7
platform-data.appquery-platform.aoav3.backfill 25         8369261
8370063         802
i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
app-query-platform-aoa-backfill-v7
platform-data.appquery-platform.aoav3.backfill 31         8368087
8368097         10
 i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68
app-query-platform-aoa-backfill-v7
platform-data.appquery-platform.aoav3.backfill 22         8370475
8371319         844
i-0e89c9bee06f71f68-2c1558e4-7975-493e-8c6b-08da7c4bf941 /10.123.16.69
consumer-app-query-platform-aoa-backfill-v7-i-0e89c9bee06f71f68

On Thu, Mar 17, 2022 at 2:29 AM Bruno Cadonna <ca...@apache.org> wrote:

> Hi Richard,
>
> The group.instance.id config is orthogonal to the partition assignment
> strategy. The group.instance.id is used if you want to have static
> membership which is not related to the partition assignment strategy.
>
> If you think you found a bug, could you please open a JIRA ticket with
> steps to reproduce the bug.
>
> Best,
> Bruno
>
> On 16.03.22 10:01, Luke Chen wrote:
> > Hi Richard,
> >
> > Right, you are not missing any settings beyond the partition assignment
> > strategy and the group instance id.
> > You might need to know from the log that why the rebalance triggered to
> do
> > troubleshooting.
> >
> > Thank you.
> > Luke
> >
> > On Wed, Mar 16, 2022 at 3:02 PM Richard Ney <ka...@gmail.com>
> wrote:
> >
> >> Hi Luke,
> >>
> >> I did end up with a situation where I had two instances connecting to
> the
> >> same consumer group and they ended up in a rebalance trade-off. All
> >> partitions kept going back and forth between the two microservice
> >> instances. That was a test case where I'd removed the Group Instance Id
> >> setting to see what would happen. I stabilized that one by reducing it
> to a
> >> single consumer after 20+ rebalances.
> >>
> >> The other issue I'm seeing may be a bug in the Functional Scala
> `fs2-kafka`
> >> wrapper where I see the partitions cleanly assigned but one or more
> >> instances isn't ingesting. I found out that they recently added support
> for
> >> the cooperative sticky assignor for the stream recreation since they
> were
> >> assuming a full revocation of the partitions.
> >>
> >> So I basically wanted to make sure I wasn't missing any settings beyond
> the
> >> partition assignment strategy and the group instance id.
> >>
> >> -Richard
> >>
> >> -Richard
> >>
> >> On Tue, Mar 15, 2022 at 11:27 PM Luke Chen <sh...@gmail.com> wrote:
> >>
> >>> Hi Richard,
> >>>
> >>> To use `CooperativeStickyAssignor`, no other special configuration is
> >>> required.
> >>>
> >>> I'm not sure what does `make the rebalance happen cleanly` mean.
> >>> Did you find any problem during group rebalance?
> >>>
> >>> Thank you.
> >>> Luke
> >>>
> >>> On Wed, Mar 16, 2022 at 1:00 PM Richard Ney <richard.ney@lookout.com
> >>> .invalid>
> >>> wrote:
> >>>
> >>>> Trying to find a good sample of what consumer settings besides setting
> >>>>
> >>>> ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG to
> >>>> org.apache.kafka.clients.consumer.CooperativeStickyAssignor
> >>>>
> >>>> is needed to make the rebalance happen cleanly. Unable to find and
> >> decent
> >>>> documentation or code samples. I have set the Group Instance Id to the
> >>> EC2
> >>>> instance id based on one blog write up I found.
> >>>>
> >>>> Any help would be appreciated
> >>>>
> >>>> -Richard
> >>>>
> >>>
> >>
> >
>

Re: Setting up the CooperativeStickyAssignor in Java

Posted by Bruno Cadonna <ca...@apache.org>.
Hi Richard,

The group.instance.id config is orthogonal to the partition assignment 
strategy. The group.instance.id is used if you want to have static 
membership which is not related to the partition assignment strategy.

If you think you found a bug, could you please open a JIRA ticket with 
steps to reproduce the bug.

Best,
Bruno

On 16.03.22 10:01, Luke Chen wrote:
> Hi Richard,
> 
> Right, you are not missing any settings beyond the partition assignment
> strategy and the group instance id.
> You might need to know from the log that why the rebalance triggered to do
> troubleshooting.
> 
> Thank you.
> Luke
> 
> On Wed, Mar 16, 2022 at 3:02 PM Richard Ney <ka...@gmail.com> wrote:
> 
>> Hi Luke,
>>
>> I did end up with a situation where I had two instances connecting to the
>> same consumer group and they ended up in a rebalance trade-off. All
>> partitions kept going back and forth between the two microservice
>> instances. That was a test case where I'd removed the Group Instance Id
>> setting to see what would happen. I stabilized that one by reducing it to a
>> single consumer after 20+ rebalances.
>>
>> The other issue I'm seeing may be a bug in the Functional Scala `fs2-kafka`
>> wrapper where I see the partitions cleanly assigned but one or more
>> instances isn't ingesting. I found out that they recently added support for
>> the cooperative sticky assignor for the stream recreation since they were
>> assuming a full revocation of the partitions.
>>
>> So I basically wanted to make sure I wasn't missing any settings beyond the
>> partition assignment strategy and the group instance id.
>>
>> -Richard
>>
>> -Richard
>>
>> On Tue, Mar 15, 2022 at 11:27 PM Luke Chen <sh...@gmail.com> wrote:
>>
>>> Hi Richard,
>>>
>>> To use `CooperativeStickyAssignor`, no other special configuration is
>>> required.
>>>
>>> I'm not sure what does `make the rebalance happen cleanly` mean.
>>> Did you find any problem during group rebalance?
>>>
>>> Thank you.
>>> Luke
>>>
>>> On Wed, Mar 16, 2022 at 1:00 PM Richard Ney <richard.ney@lookout.com
>>> .invalid>
>>> wrote:
>>>
>>>> Trying to find a good sample of what consumer settings besides setting
>>>>
>>>> ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG to
>>>> org.apache.kafka.clients.consumer.CooperativeStickyAssignor
>>>>
>>>> is needed to make the rebalance happen cleanly. Unable to find and
>> decent
>>>> documentation or code samples. I have set the Group Instance Id to the
>>> EC2
>>>> instance id based on one blog write up I found.
>>>>
>>>> Any help would be appreciated
>>>>
>>>> -Richard
>>>>
>>>
>>
> 

Re: Setting up the CooperativeStickyAssignor in Java

Posted by Luke Chen <sh...@gmail.com>.
Hi Richard,

Right, you are not missing any settings beyond the partition assignment
strategy and the group instance id.
You might need to know from the log that why the rebalance triggered to do
troubleshooting.

Thank you.
Luke

On Wed, Mar 16, 2022 at 3:02 PM Richard Ney <ka...@gmail.com> wrote:

> Hi Luke,
>
> I did end up with a situation where I had two instances connecting to the
> same consumer group and they ended up in a rebalance trade-off. All
> partitions kept going back and forth between the two microservice
> instances. That was a test case where I'd removed the Group Instance Id
> setting to see what would happen. I stabilized that one by reducing it to a
> single consumer after 20+ rebalances.
>
> The other issue I'm seeing may be a bug in the Functional Scala `fs2-kafka`
> wrapper where I see the partitions cleanly assigned but one or more
> instances isn't ingesting. I found out that they recently added support for
> the cooperative sticky assignor for the stream recreation since they were
> assuming a full revocation of the partitions.
>
> So I basically wanted to make sure I wasn't missing any settings beyond the
> partition assignment strategy and the group instance id.
>
> -Richard
>
> -Richard
>
> On Tue, Mar 15, 2022 at 11:27 PM Luke Chen <sh...@gmail.com> wrote:
>
> > Hi Richard,
> >
> > To use `CooperativeStickyAssignor`, no other special configuration is
> > required.
> >
> > I'm not sure what does `make the rebalance happen cleanly` mean.
> > Did you find any problem during group rebalance?
> >
> > Thank you.
> > Luke
> >
> > On Wed, Mar 16, 2022 at 1:00 PM Richard Ney <richard.ney@lookout.com
> > .invalid>
> > wrote:
> >
> > > Trying to find a good sample of what consumer settings besides setting
> > >
> > > ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG to
> > > org.apache.kafka.clients.consumer.CooperativeStickyAssignor
> > >
> > > is needed to make the rebalance happen cleanly. Unable to find and
> decent
> > > documentation or code samples. I have set the Group Instance Id to the
> > EC2
> > > instance id based on one blog write up I found.
> > >
> > > Any help would be appreciated
> > >
> > > -Richard
> > >
> >
>

Re: Setting up the CooperativeStickyAssignor in Java

Posted by Richard Ney <ka...@gmail.com>.
Hi Luke,

I did end up with a situation where I had two instances connecting to the
same consumer group and they ended up in a rebalance trade-off. All
partitions kept going back and forth between the two microservice
instances. That was a test case where I'd removed the Group Instance Id
setting to see what would happen. I stabilized that one by reducing it to a
single consumer after 20+ rebalances.

The other issue I'm seeing may be a bug in the Functional Scala `fs2-kafka`
wrapper where I see the partitions cleanly assigned but one or more
instances isn't ingesting. I found out that they recently added support for
the cooperative sticky assignor for the stream recreation since they were
assuming a full revocation of the partitions.

So I basically wanted to make sure I wasn't missing any settings beyond the
partition assignment strategy and the group instance id.

-Richard

-Richard

On Tue, Mar 15, 2022 at 11:27 PM Luke Chen <sh...@gmail.com> wrote:

> Hi Richard,
>
> To use `CooperativeStickyAssignor`, no other special configuration is
> required.
>
> I'm not sure what does `make the rebalance happen cleanly` mean.
> Did you find any problem during group rebalance?
>
> Thank you.
> Luke
>
> On Wed, Mar 16, 2022 at 1:00 PM Richard Ney <richard.ney@lookout.com
> .invalid>
> wrote:
>
> > Trying to find a good sample of what consumer settings besides setting
> >
> > ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG to
> > org.apache.kafka.clients.consumer.CooperativeStickyAssignor
> >
> > is needed to make the rebalance happen cleanly. Unable to find and decent
> > documentation or code samples. I have set the Group Instance Id to the
> EC2
> > instance id based on one blog write up I found.
> >
> > Any help would be appreciated
> >
> > -Richard
> >
>

Re: Setting up the CooperativeStickyAssignor in Java

Posted by Luke Chen <sh...@gmail.com>.
Hi Richard,

To use `CooperativeStickyAssignor`, no other special configuration is
required.

I'm not sure what does `make the rebalance happen cleanly` mean.
Did you find any problem during group rebalance?

Thank you.
Luke

On Wed, Mar 16, 2022 at 1:00 PM Richard Ney <ri...@lookout.com.invalid>
wrote:

> Trying to find a good sample of what consumer settings besides setting
>
> ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG to
> org.apache.kafka.clients.consumer.CooperativeStickyAssignor
>
> is needed to make the rebalance happen cleanly. Unable to find and decent
> documentation or code samples. I have set the Group Instance Id to the EC2
> instance id based on one blog write up I found.
>
> Any help would be appreciated
>
> -Richard
>