You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Laurynas Katkus <la...@mambu.com> on 2021/10/01 08:33:37 UTC

MockProducer and MockConsumer

Hello,

First of all, I just want to express admiration on the job you did guys.
Thank you very much for providing such tool to the world.

To the topic at hand. In our company, we'd like to use Kafka. Currently
we're in testing phase. I'm a big believer in blackbox testing or real unit
testing instead of mocking everything out. That is why I was very happy to
find your provided mock implementation of Consumer and Producer -
MockConsumer and MockProducer.

However, there is lacking documentation about how to use them. I tried
resorting to books like "Kafka The Definitive Guide 2nd edition" and "Kafka
In Action", but both contain very small sections about them. Furhermore, I
haven't found any official documentation from Confluent side as well.
Despite a small blog post in Baeldung[0][1], there isn't anything else.
It's a shame that so much work went into making these implementations, but
no documentation provided. One could argue, that just use the API as it's
self-documenting. Unfortunetally I've encountered weird behavour when
trying to use them. For example, using MockProducer is pretty
straightfoward:

```java
@Test
void givenKeyValue_whenSend_thenVerifyHistory() {

MockProducer mockProducer = new MockProducer<>(true, new StringSerializer(),
new StringSerializer());

kafkaProducer = new KafkaProducer(mockProducer);
Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send("soccer",
"{\"site\" : \"baeldung\"}");

assertTrue(mockProducer.history().size() == 1);
}
```

```java
@Test
void givenKeyValue_whenSend_thenReturnException() {
MockProducer<String, String> mockProducer = new MockProducer<>(false, new
StringSerializer(), new StringSerializer())

kafkaProducer = new KafkaProducer(mockProducer);
Future<RecordMetadata> record = kafkaProducer.send("site", "{\"site\" : \"
baeldung\"}");
mockProducer.errorNext(new RuntimeException());

try {
record.get();
} catch (ExecutionException | InterruptedException ex) {
assertEquals(e, ex.getCause());
}
assertTrue(record.isDone());
}
```

When it comes to MockConsumer, it's unclear how to use it:

```java
private static final MockConsumer<String, String> mockConsumer = new
MockConsumer<>(OffsetResetStrategy.EARLIEST);
...
@Test
void test() {
String hello = "hello";
mockConsumer.schedulePollTask(() -> {
mockConsumer.addRecord(new ConsumerRecord<>(hello, 0, 0, "key", "value"));
});
mockConsumer.subscribe(Collections.singleton(hello));

ConsumerRecords<String, String> poll = mockConsumer.poll(Duration.ofMillis(
100));
}
```

Throws "Cannot add records for a partition that is not assigned to the
consumer". To make it work, I need this boilerplate:

```java
private static final MockConsumer<String, String> mockConsumer = new
MockConsumer<>(OffsetResetStrategy.EARLIEST);
...
@Test
void name3() {
String hello = "hello";
mockConsumer.schedulePollTask(() -> {
mockConsumer.rebalance(Collections.singletonList(new TopicPartition(hello, 0
)));
mockConsumer.addRecord(new ConsumerRecord<>(hello, 0, 0, "key", "value"));
});
mockConsumer.subscribe(Collections.singleton(hello));

HashMap<TopicPartition, Long> startOffsets = new HashMap<>();
TopicPartition tp = new TopicPartition(hello, 0);
startOffsets.put(tp, 0L);
mockConsumer.updateBeginningOffsets(startOffsets);

ConsumerRecords<String, String> poll = mockConsumer.poll(Duration.ofMillis(
100));
}
```

There are parts like making consumer to rebalance. It feels like I need to
do "broker's work". Why it cannot rebalance itself depending on how many
consumers are subscribe to the topic? Another question, I need to manually
update offset. Otherwise it doesn't work either. This is not intuitive.

My question is:
- Will you expand on MockConsumer and MockProducer? Or is this abandoned
initiative/good enough for internal usage and others are best using tools
like Mockito?
- If you're still looking to develop it, will you add documentation? Do you
see this as requirement?
- If you're going to add documentation, maybe you have some drafts or
timelines?

Thank you for reading and reply.


Links:
[0] https://www.baeldung.com/kafka-mockconsumer
[1] https://www.baeldung.com/kafka-mockproducer
Laurynas Katkus
Senior Developer