You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Sebastian Proksch <se...@proks.ch> on 2022/01/25 04:11:29 UTC

Is there an equivalent of a "synchronous subscribe"?

Hey all! I am actually not sure whether this is a bug report or a
feature request, maybe you can help me understand whether I am missing
the obvious.

I am using Apache Kafka in one of my Java projects (using
apache-kafka-clients-3.1.0). In an integration test, I would like to
check that (after running some producers) a bunch of topics have the
expected contents. As such, the whole pipeline is filled and "standing
still" and I am connecting a single new consumer that alone forms a
new consumer group. I would like to iterate over all topics
(`KafkaConsumer.listTopics()`) and one by one consume all messages of
every topic.

As far as I understand, a subscription on a new topic will initialize
a new consumer group and it takes a bit of time to join the group and
to rebalance the partitions. However, I would expect that this logic
is blocking and I find the following behavior unexpected...

    var con = ... // get consumer instance
    con.subscribe(Set.of(SOME_TOPIC));
    var records = con.poll(Duration.ZERO)

The poll will return an empty records array, even though there *IS*
data in said topic. Even when I register a `ConsumerRebalanceListener`
in the subscribe call, I won't ever see any assignment of
`TopicPartition` to the consumer, not even a delayed one.

On the other hand, when I change the code to

    var con = ... // get consumer instance
    con.subscribe(Set.of(SOME_TOPIC));
    var records = con.poll(Duration.ofMillis(100));

I now get actual records. Also, when I register a
`ConsumerRebalanceListener`, I receive an `onPartitionsAssigned`
notification.

I really dislike the idea of introducing magic numbers like the 100ms
to my tests... if the number is too small and the assignment takes
longer for larger topics, my tests will break. Too large numbers will
slow the tests down unnecessarily. The functionality that I am
actually looking for is a synchronous version of the `subscribe`
method or some other way to block execution until my client has
finished joining the group and the partitions are rebalanced. It feels
like this should be default behavior.

Am I completely off with my expectations for the behavior of the
`subscribe` method or am I missing something? Is there a way to
achieve said behavior with the current clients? Maybe my code just
lacks the right config parameter...

Thanks for your help, any pointer is appreciated!

best
Sebastian

Re: Is there an equivalent of a "synchronous subscribe"?

Posted by Richard Bosch <ri...@axual.com>.
Hi Sebastian,

I've encountered this as well during integration tests and have taken a
different approach.
When I need to verify the contents of topics with a single consumer I use
the assign method.
You do need to know the number of partitions of the topic as the argument
used is a collection of TopicPartitions.
It did take a bit of effort rewriting the tests, but because it saves
several seconds of rebalancing per test it was worth it.

I hope this helps you a bit.

Kind regards,


Richard Bosch




On Tue, Jan 25, 2022 at 5:12 AM Sebastian Proksch <se...@proks.ch>
wrote:

> Hey all! I am actually not sure whether this is a bug report or a
> feature request, maybe you can help me understand whether I am missing
> the obvious.
>
> I am using Apache Kafka in one of my Java projects (using
> apache-kafka-clients-3.1.0). In an integration test, I would like to
> check that (after running some producers) a bunch of topics have the
> expected contents. As such, the whole pipeline is filled and "standing
> still" and I am connecting a single new consumer that alone forms a
> new consumer group. I would like to iterate over all topics
> (`KafkaConsumer.listTopics()`) and one by one consume all messages of
> every topic.
>
> As far as I understand, a subscription on a new topic will initialize
> a new consumer group and it takes a bit of time to join the group and
> to rebalance the partitions. However, I would expect that this logic
> is blocking and I find the following behavior unexpected...
>
>     var con = ... // get consumer instance
>     con.subscribe(Set.of(SOME_TOPIC));
>     var records = con.poll(Duration.ZERO)
>
> The poll will return an empty records array, even though there *IS*
> data in said topic. Even when I register a `ConsumerRebalanceListener`
> in the subscribe call, I won't ever see any assignment of
> `TopicPartition` to the consumer, not even a delayed one.
>
> On the other hand, when I change the code to
>
>     var con = ... // get consumer instance
>     con.subscribe(Set.of(SOME_TOPIC));
>     var records = con.poll(Duration.ofMillis(100));
>
> I now get actual records. Also, when I register a
> `ConsumerRebalanceListener`, I receive an `onPartitionsAssigned`
> notification.
>
> I really dislike the idea of introducing magic numbers like the 100ms
> to my tests... if the number is too small and the assignment takes
> longer for larger topics, my tests will break. Too large numbers will
> slow the tests down unnecessarily. The functionality that I am
> actually looking for is a synchronous version of the `subscribe`
> method or some other way to block execution until my client has
> finished joining the group and the partitions are rebalanced. It feels
> like this should be default behavior.
>
> Am I completely off with my expectations for the behavior of the
> `subscribe` method or am I missing something? Is there a way to
> achieve said behavior with the current clients? Maybe my code just
> lacks the right config parameter...
>
> Thanks for your help, any pointer is appreciated!
>
> best
> Sebastian
>

Re: Is there an equivalent of a "synchronous subscribe"?

Posted by Sebastian Proksch <se...@proks.ch>.
Inspired by the `TestUtils` that Luke has referred to, I wrote a
similar utility function and put together a minimal example [1] that
IMHO exposes that the poll method behaves rather
indeterministically.... or at least I cannot explain the observed
behavior.

In the example, I will always wait a total of 200ms between two
subsequent polls (eg. 150ms during poll, 50ms in the loop). I get the
correct result, when I use a poll timeout of at least 100ms. However,
when I poll with Duration.ZERO (and therefore sleep in the loop for
200ms), I get zero results. Shouldn't it be completely irrelevant
where I wait?

I also cannot get the solution suggested by Paul to work, as
requesting endOffsets requires a Collection<TopicPartition>, which I
try to acquire through the assignment of my consumer. Unfortunately,
con.assignment() is not directly available and only triggered on the
first poll...

The whole interaction with the consumer would get sooo much easier if
the API would give us either a) a blocking "wait for
subscription/assignment/rebalancing/<whatever else is to be done>"
method or b) not just a listener for when these tasks are completed,
but also a notification when they are started/need to be performed, so
we can do the blocking ourselves until we get the completion signal.

best
Sebastian

[1] https://pastebin.com/kmE0FJMy

On Tue, Jan 25, 2022 at 8:27 AM Luke Chen <sh...@gmail.com> wrote:
>
> Yes, there is no such "synchronous subscribe" method in Kafka.
> Consumer group will start to work after `poll` is called.
>
> Inside Kafka, there are util methods like `awaitAssignment` or
> `awaitRebalance` to wait for the initialization completed for tests, which
> is checking with `poll`
> You can refer to this test to learn how Kafka write tests internally:
> https://github.com/apache/kafka/blob/trunk/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
>
> For the feature request, I think it will be a big change if we want to
> start the rebalance and other metadata update during `subscribe`.
> So, it might need a more solid motivation for this change.
> Welcome to propose this feature with KIP process:
> https://cwiki.apache.org/confluence/display/kafka/kafka+improvement+proposals
>
> Thank you.
> Luke
>
> On Tue, Jan 25, 2022 at 12:27 PM Paul Whalen <pg...@gmail.com> wrote:
>
> > Faced with the same problem, we've done the following:
> >  - Find and save the latest offset(s) using consumer.endOffsets()
> >  - consumer.poll() and process records in a while loop until you've
> > processed up through the saved offsets
> >
> > Notice that it doesn't matter how long the poll duration is: you've set
> > your watermark and will read until you get there, no matter how long it
> > takes.  And you know you're going to get there eventually (assuming the
> > cluster is healthy), because you know the offsets exist.
> >
> > Hope that helps!
> >
> > Paul
> >
> > On Mon, Jan 24, 2022 at 10:12 PM Sebastian Proksch <se...@proks.ch>
> > wrote:
> >
> > > Hey all! I am actually not sure whether this is a bug report or a
> > > feature request, maybe you can help me understand whether I am missing
> > > the obvious.
> > >
> > > I am using Apache Kafka in one of my Java projects (using
> > > apache-kafka-clients-3.1.0). In an integration test, I would like to
> > > check that (after running some producers) a bunch of topics have the
> > > expected contents. As such, the whole pipeline is filled and "standing
> > > still" and I am connecting a single new consumer that alone forms a
> > > new consumer group. I would like to iterate over all topics
> > > (`KafkaConsumer.listTopics()`) and one by one consume all messages of
> > > every topic.
> > >
> > > As far as I understand, a subscription on a new topic will initialize
> > > a new consumer group and it takes a bit of time to join the group and
> > > to rebalance the partitions. However, I would expect that this logic
> > > is blocking and I find the following behavior unexpected...
> > >
> > >     var con = ... // get consumer instance
> > >     con.subscribe(Set.of(SOME_TOPIC));
> > >     var records = con.poll(Duration.ZERO)
> > >
> > > The poll will return an empty records array, even though there *IS*
> > > data in said topic. Even when I register a `ConsumerRebalanceListener`
> > > in the subscribe call, I won't ever see any assignment of
> > > `TopicPartition` to the consumer, not even a delayed one.
> > >
> > > On the other hand, when I change the code to
> > >
> > >     var con = ... // get consumer instance
> > >     con.subscribe(Set.of(SOME_TOPIC));
> > >     var records = con.poll(Duration.ofMillis(100));
> > >
> > > I now get actual records. Also, when I register a
> > > `ConsumerRebalanceListener`, I receive an `onPartitionsAssigned`
> > > notification.
> > >
> > > I really dislike the idea of introducing magic numbers like the 100ms
> > > to my tests... if the number is too small and the assignment takes
> > > longer for larger topics, my tests will break. Too large numbers will
> > > slow the tests down unnecessarily. The functionality that I am
> > > actually looking for is a synchronous version of the `subscribe`
> > > method or some other way to block execution until my client has
> > > finished joining the group and the partitions are rebalanced. It feels
> > > like this should be default behavior.
> > >
> > > Am I completely off with my expectations for the behavior of the
> > > `subscribe` method or am I missing something? Is there a way to
> > > achieve said behavior with the current clients? Maybe my code just
> > > lacks the right config parameter...
> > >
> > > Thanks for your help, any pointer is appreciated!
> > >
> > > best
> > > Sebastian
> > >
> >

Re: Is there an equivalent of a "synchronous subscribe"?

Posted by Luke Chen <sh...@gmail.com>.
Yes, there is no such "synchronous subscribe" method in Kafka.
Consumer group will start to work after `poll` is called.

Inside Kafka, there are util methods like `awaitAssignment` or
`awaitRebalance` to wait for the initialization completed for tests, which
is checking with `poll`
You can refer to this test to learn how Kafka write tests internally:
https://github.com/apache/kafka/blob/trunk/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala

For the feature request, I think it will be a big change if we want to
start the rebalance and other metadata update during `subscribe`.
So, it might need a more solid motivation for this change.
Welcome to propose this feature with KIP process:
https://cwiki.apache.org/confluence/display/kafka/kafka+improvement+proposals

Thank you.
Luke

On Tue, Jan 25, 2022 at 12:27 PM Paul Whalen <pg...@gmail.com> wrote:

> Faced with the same problem, we've done the following:
>  - Find and save the latest offset(s) using consumer.endOffsets()
>  - consumer.poll() and process records in a while loop until you've
> processed up through the saved offsets
>
> Notice that it doesn't matter how long the poll duration is: you've set
> your watermark and will read until you get there, no matter how long it
> takes.  And you know you're going to get there eventually (assuming the
> cluster is healthy), because you know the offsets exist.
>
> Hope that helps!
>
> Paul
>
> On Mon, Jan 24, 2022 at 10:12 PM Sebastian Proksch <se...@proks.ch>
> wrote:
>
> > Hey all! I am actually not sure whether this is a bug report or a
> > feature request, maybe you can help me understand whether I am missing
> > the obvious.
> >
> > I am using Apache Kafka in one of my Java projects (using
> > apache-kafka-clients-3.1.0). In an integration test, I would like to
> > check that (after running some producers) a bunch of topics have the
> > expected contents. As such, the whole pipeline is filled and "standing
> > still" and I am connecting a single new consumer that alone forms a
> > new consumer group. I would like to iterate over all topics
> > (`KafkaConsumer.listTopics()`) and one by one consume all messages of
> > every topic.
> >
> > As far as I understand, a subscription on a new topic will initialize
> > a new consumer group and it takes a bit of time to join the group and
> > to rebalance the partitions. However, I would expect that this logic
> > is blocking and I find the following behavior unexpected...
> >
> >     var con = ... // get consumer instance
> >     con.subscribe(Set.of(SOME_TOPIC));
> >     var records = con.poll(Duration.ZERO)
> >
> > The poll will return an empty records array, even though there *IS*
> > data in said topic. Even when I register a `ConsumerRebalanceListener`
> > in the subscribe call, I won't ever see any assignment of
> > `TopicPartition` to the consumer, not even a delayed one.
> >
> > On the other hand, when I change the code to
> >
> >     var con = ... // get consumer instance
> >     con.subscribe(Set.of(SOME_TOPIC));
> >     var records = con.poll(Duration.ofMillis(100));
> >
> > I now get actual records. Also, when I register a
> > `ConsumerRebalanceListener`, I receive an `onPartitionsAssigned`
> > notification.
> >
> > I really dislike the idea of introducing magic numbers like the 100ms
> > to my tests... if the number is too small and the assignment takes
> > longer for larger topics, my tests will break. Too large numbers will
> > slow the tests down unnecessarily. The functionality that I am
> > actually looking for is a synchronous version of the `subscribe`
> > method or some other way to block execution until my client has
> > finished joining the group and the partitions are rebalanced. It feels
> > like this should be default behavior.
> >
> > Am I completely off with my expectations for the behavior of the
> > `subscribe` method or am I missing something? Is there a way to
> > achieve said behavior with the current clients? Maybe my code just
> > lacks the right config parameter...
> >
> > Thanks for your help, any pointer is appreciated!
> >
> > best
> > Sebastian
> >
>

Re: Is there an equivalent of a "synchronous subscribe"?

Posted by Paul Whalen <pg...@gmail.com>.
Faced with the same problem, we've done the following:
 - Find and save the latest offset(s) using consumer.endOffsets()
 - consumer.poll() and process records in a while loop until you've
processed up through the saved offsets

Notice that it doesn't matter how long the poll duration is: you've set
your watermark and will read until you get there, no matter how long it
takes.  And you know you're going to get there eventually (assuming the
cluster is healthy), because you know the offsets exist.

Hope that helps!

Paul

On Mon, Jan 24, 2022 at 10:12 PM Sebastian Proksch <se...@proks.ch>
wrote:

> Hey all! I am actually not sure whether this is a bug report or a
> feature request, maybe you can help me understand whether I am missing
> the obvious.
>
> I am using Apache Kafka in one of my Java projects (using
> apache-kafka-clients-3.1.0). In an integration test, I would like to
> check that (after running some producers) a bunch of topics have the
> expected contents. As such, the whole pipeline is filled and "standing
> still" and I am connecting a single new consumer that alone forms a
> new consumer group. I would like to iterate over all topics
> (`KafkaConsumer.listTopics()`) and one by one consume all messages of
> every topic.
>
> As far as I understand, a subscription on a new topic will initialize
> a new consumer group and it takes a bit of time to join the group and
> to rebalance the partitions. However, I would expect that this logic
> is blocking and I find the following behavior unexpected...
>
>     var con = ... // get consumer instance
>     con.subscribe(Set.of(SOME_TOPIC));
>     var records = con.poll(Duration.ZERO)
>
> The poll will return an empty records array, even though there *IS*
> data in said topic. Even when I register a `ConsumerRebalanceListener`
> in the subscribe call, I won't ever see any assignment of
> `TopicPartition` to the consumer, not even a delayed one.
>
> On the other hand, when I change the code to
>
>     var con = ... // get consumer instance
>     con.subscribe(Set.of(SOME_TOPIC));
>     var records = con.poll(Duration.ofMillis(100));
>
> I now get actual records. Also, when I register a
> `ConsumerRebalanceListener`, I receive an `onPartitionsAssigned`
> notification.
>
> I really dislike the idea of introducing magic numbers like the 100ms
> to my tests... if the number is too small and the assignment takes
> longer for larger topics, my tests will break. Too large numbers will
> slow the tests down unnecessarily. The functionality that I am
> actually looking for is a synchronous version of the `subscribe`
> method or some other way to block execution until my client has
> finished joining the group and the partitions are rebalanced. It feels
> like this should be default behavior.
>
> Am I completely off with my expectations for the behavior of the
> `subscribe` method or am I missing something? Is there a way to
> achieve said behavior with the current clients? Maybe my code just
> lacks the right config parameter...
>
> Thanks for your help, any pointer is appreciated!
>
> best
> Sebastian
>