You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Gabriel Giussi <ga...@gmail.com> on 2018/02/19 12:39:03 UTC

Testing with MockConsumer

Hi,

I'm trying to use MockConsumer to test my application code but I've faced a
couple of limitations and I want to know if there are workarounds or
something that I'm overlooking.
Note: I'm using kafka-clients v 0.11.0.2


   1. Why the addRecord
   <https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c14018cf7d3a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java#L179>
   requires that the consumer has assigned partitions? Given that this is just
   simulating records being produced or existing records.
   2. Why the poll
   <https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c14018cf7d3a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java#L132>
   clear the map of records? It should not be cleared after commit?
   3. Why the commitAsync
   <https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c14018cf7d3a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java#L198>
   doesn't check for an exception and always succeed?

Due to items (2) and (3) I'm not be able to test scenarios where the
commits fails and the consumer should poll again the same elements.

If someone knows about other scenarios that can't be tested with
MockConsumer, please let me know.

Thanks.

Re: Testing with MockConsumer

Posted by Ted Yu <yu...@gmail.com>.
For #3, a better example would be in ConsumerCoordinator (around line 632).

        commitOffsetsAsync(allConsumedOffsets, new OffsetCommitCallback() {
            @Override
            public void onComplete(Map<TopicPartition, OffsetAndMetadata>
offsets, Exception exception) {

FYI

On Mon, Feb 19, 2018 at 10:56 AM, Gabriel Giussi <ga...@gmail.com>
wrote:

> Hi Ted,
> my mistake was believe that commited offsets are used on the next poll, but
> is not the case
> <https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c1
> 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> consumer/KafkaConsumer.java#L1202>
> .
>
> > The offsets committed using this API will be used on the first fetch
> after
> > every rebalance and also on startup
> >
>
> So, what to do after a failed commit depends on the nature of the exception
> I guess.
>
>    - WakeupException: retry
>    - Others: close consumer
>
> Thanks for your help to solve #2. I'm wondering about 1# and 3# yet.
>
> 2018-02-19 11:46 GMT-03:00 Ted Yu <yu...@gmail.com>:
>
> > For #2, I think the assumption is that the records are processed by the
> > loop:
> >
> > https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c1
> > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> > consumer/MockConsumer.java#L164
> >
> >
> >
> > On Mon, Feb 19, 2018 at 4:39 AM, Gabriel Giussi <gabrielgiussi@gmail.com
> >
> > wrote:
> >
> > > Hi,
> > >
> > > I'm trying to use MockConsumer to test my application code but I've
> > faced a
> > > couple of limitations and I want to know if there are workarounds or
> > > something that I'm overlooking.
> > > Note: I'm using kafka-clients v 0.11.0.2
> > >
> > >
> > >    1. Why the addRecord
> > >    <https://github.com/apache/kafka/blob/
> 73be1e1168f91ee2a9d68e1d1c75c1
> > > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> > > consumer/MockConsumer.java#L179>
> > >    requires that the consumer has assigned partitions? Given that this
> is
> > > just
> > >    simulating records being produced or existing records.
> > >    2. Why the poll
> > >    <https://github.com/apache/kafka/blob/
> 73be1e1168f91ee2a9d68e1d1c75c1
> > > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> > > consumer/MockConsumer.java#L132>
> > >    clear the map of records? It should not be cleared after commit?
> > >    3. Why the commitAsync
> > >    <https://github.com/apache/kafka/blob/
> 73be1e1168f91ee2a9d68e1d1c75c1
> > > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> > > consumer/MockConsumer.java#L198>
> > >    doesn't check for an exception and always succeed?
> > >
> > > Due to items (2) and (3) I'm not be able to test scenarios where the
> > > commits fails and the consumer should poll again the same elements.
> > >
> > > If someone knows about other scenarios that can't be tested with
> > > MockConsumer, please let me know.
> > >
> > > Thanks.
> > >
> >
>

Re: Testing with MockConsumer

Posted by Gabriel Giussi <ga...@gmail.com>.
Hi Ted,
my mistake was believe that commited offsets are used on the next poll, but
is not the case
<https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c14018cf7d3a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1202>
.

> The offsets committed using this API will be used on the first fetch after
> every rebalance and also on startup
>

So, what to do after a failed commit depends on the nature of the exception
I guess.

   - WakeupException: retry
   - Others: close consumer

Thanks for your help to solve #2. I'm wondering about 1# and 3# yet.

2018-02-19 11:46 GMT-03:00 Ted Yu <yu...@gmail.com>:

> For #2, I think the assumption is that the records are processed by the
> loop:
>
> https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c1
> 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> consumer/MockConsumer.java#L164
>
>
>
> On Mon, Feb 19, 2018 at 4:39 AM, Gabriel Giussi <ga...@gmail.com>
> wrote:
>
> > Hi,
> >
> > I'm trying to use MockConsumer to test my application code but I've
> faced a
> > couple of limitations and I want to know if there are workarounds or
> > something that I'm overlooking.
> > Note: I'm using kafka-clients v 0.11.0.2
> >
> >
> >    1. Why the addRecord
> >    <https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c1
> > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> > consumer/MockConsumer.java#L179>
> >    requires that the consumer has assigned partitions? Given that this is
> > just
> >    simulating records being produced or existing records.
> >    2. Why the poll
> >    <https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c1
> > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> > consumer/MockConsumer.java#L132>
> >    clear the map of records? It should not be cleared after commit?
> >    3. Why the commitAsync
> >    <https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c1
> > 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> > consumer/MockConsumer.java#L198>
> >    doesn't check for an exception and always succeed?
> >
> > Due to items (2) and (3) I'm not be able to test scenarios where the
> > commits fails and the consumer should poll again the same elements.
> >
> > If someone knows about other scenarios that can't be tested with
> > MockConsumer, please let me know.
> >
> > Thanks.
> >
>

Re: Testing with MockConsumer

Posted by Ted Yu <yu...@gmail.com>.
For #2, I think the assumption is that the records are processed by the
loop:

https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c14018cf7d3a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java#L164



On Mon, Feb 19, 2018 at 4:39 AM, Gabriel Giussi <ga...@gmail.com>
wrote:

> Hi,
>
> I'm trying to use MockConsumer to test my application code but I've faced a
> couple of limitations and I want to know if there are workarounds or
> something that I'm overlooking.
> Note: I'm using kafka-clients v 0.11.0.2
>
>
>    1. Why the addRecord
>    <https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c1
> 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> consumer/MockConsumer.java#L179>
>    requires that the consumer has assigned partitions? Given that this is
> just
>    simulating records being produced or existing records.
>    2. Why the poll
>    <https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c1
> 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> consumer/MockConsumer.java#L132>
>    clear the map of records? It should not be cleared after commit?
>    3. Why the commitAsync
>    <https://github.com/apache/kafka/blob/73be1e1168f91ee2a9d68e1d1c75c1
> 4018cf7d3a/clients/src/main/java/org/apache/kafka/clients/
> consumer/MockConsumer.java#L198>
>    doesn't check for an exception and always succeed?
>
> Due to items (2) and (3) I'm not be able to test scenarios where the
> commits fails and the consumer should poll again the same elements.
>
> If someone knows about other scenarios that can't be tested with
> MockConsumer, please let me know.
>
> Thanks.
>