You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by Stig Døssing <ge...@gmail.com> on 2017/05/31 19:12:21 UTC

Use of the KafkaConsumer.subscribe API in the storm-kafka-client spout

Hi,

We've recently seen some issues raised by users using the default
subscription API in the new KafkaSpout (
https://issues.apache.org/jira/browse/STORM-2514,
https://issues.apache.org/jira/browse/STORM-2538).

A while ago an alternative subscription implementation was added (
https://github.com/apache/storm/pull/1835), which uses the
KafkaConsumer.assign API instead.

The subscribe API used by default causes Kafka to assign partitions to
available consumers automatically. It allows a consumer group keep
processing even in the presence of crashes because partitions are
reassigned when a consumer becomes unavailable.

The assign API used in the alternative subscription implementation leaves
it up to the consuming code to figure out a reasonable partition
distribution among a consumer group. This is basically how the storm-kafka
code worked.

Storm already ensures that all spout instances are running, and restarts
them if they crash, so we're not really gaining much by using the subscribe
API.

The disadvantages to using the subscribe API are:

* Whenever an executor crashes, the Kafka cluster reassigns all partitions.
This causes all KafkaSpout instances in that consumer group to pause until
reassignment is complete.

* The partition assignment is random, so it is difficult for users to
predict which partitions are assigned to which spout task.

* The subscribe API is extremely likely to cause hangs and other weird
behavior if the KafkaSpout is configured to run multiple tasks in an
executor. When KafkaConsumer.poll is called during partition reassignment,
it will block until the reassignment is complete. If there are multiple
consumers in a thread, the first consumer to get called will block, and the
other consumer will get ejected from the list of active consumers after a
timeout, because it didn't manage to call poll during the rebalance. See
the example code in https://issues.apache.org/jira/browse/STORM-2514, which
runs two KafkaConsumers in one thread. The result is that they flip flop
between being active, and most polls take ~30 seconds (the Kafka session
timeout)

* The random assignment of partitions causes more message duplication than
is necessary. When an executor crashes, all the other executors have their
partitions reassigned. This makes it likely that some of them will lose a
partition they had in-flight tuples on, which they will then be unable to
commit to Kafka. The message is then reemitted by whichever KafkaSpout
instance was assigned the partition. See
https://issues.apache.org/jira/browse/STORM-2538

I'd like to drop support for the subscribe API, and switch to using the
assign API by default unless anyone has a use case for the subscribe API
based subscription implementation?