You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by "Mitchell Rathbun (BLOOMBERG/ 731 LEX)" <mr...@bloomberg.net> on 2017/10/13 15:29:00 UTC

Waiting for KafkaSpout initialization

In our topology we have a KafkaSpout. The KafkaSpout's FirstPollOffsetStrategy is set to LATEST, since we only want the updates that occur after our topology is up and running. The reason we can do this is because we notify the server that we need a "recap" of certain information, so we don't need any messages before that update. The problem we are having is determining when exactly we should notify the server that we are "listening". Currently, we submit the topology by using a LocalCluster object and calling submitTopology. However, in looking through the documentation, the KafkaSpout isn't really initialized until "after the first call to  KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()". So my concern is that after we call submitTopology, we ask for a recap before the KafkaSpout is really ready to receive one, creating a race condition. Is there a way to wait for all of the spouts/bolts to be submitted and ready? Or more specifically to wait for KafkaSpout to be "initialized"? Or is there a different design approach that is recommended besides setting FirstPollOffsetStrategy to LATEST and asking for a recap?

Re: Waiting for KafkaSpout initialization

Posted by Stig Rohde Døssing <sr...@apache.org>.
I don't know if there's a recommended design approach for this. It's my
impression that most people run topologies non-stop, so recaps often aren't
necessary. For short downtimes, e.g. redeploy, using UNCOMMITTED_LATEST is
sufficient, since the topology should catch up quickly.

Your problem, as I understand it:
* You don't want to process all messages from the previous committed offset
up to the latest offset (because of performance, outdated data or some
other reason?), but you do need some of the information from those messages
so you'd like to receive a recap as a Kafka message.
* You need the recap request to happen once the KafkaConsumer position is
set, so the recap is not missed.

I don't think there's a good way to do what you want to do in current
releases. As you note, the consumer position isn't set until
KafkaSpoutConsumerRebalanceListener.initialize has run. Even if you could
hook in and run code when the topology is ready, the spout might crash and
restart, and you'd be missing the recap for the messages sent to Kafka in
the interim.

In 1.2.0 there's a KafkaTupleListener.onPartitionsAssigned hook (
https://github.com/apache/storm/blob/1.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L168)
that would allow you to run code after the consumer assignment happens.
We'll need to swap L168 and 169 to ensure that the hook is called after the
consumer position is set to avoid races.

I'm assuming that it's okay to receive a few messages prior to the recap.
If not, you'll probably need to modify the spout yourself, since I think
you'll need the recap service to both insert the recap in Kafka and provide
the offset to the spout so it can start there.

2017-10-13 17:29 GMT+02:00 Mitchell Rathbun (BLOOMBERG/ 731 LEX) <
mrathbun1@bloomberg.net>:

> In our topology we have a KafkaSpout. The KafkaSpout's
> FirstPollOffsetStrategy is set to LATEST, since we only want the updates
> that occur after our topology is up and running. The reason we can do this
> is because we notify the server that we need a "recap" of certain
> information, so we don't need any messages before that update. The problem
> we are having is determining when exactly we should notify the server that
> we are "listening". Currently, we submit the topology by using a
> LocalCluster object and calling submitTopology. However, in looking through
> the documentation, the KafkaSpout isn't really initialized until "after the
> first call to KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()".
> So my concern is that after we call submitTopology, we ask for a recap
> before the KafkaSpout is really ready to receive one, creating a race
> condition. Is there a way to wait for all of the spouts/bolts to be
> submitted and ready? Or more specifically to wait for KafkaSpout to be
> "initialized"? Or is there a different design approach that is recommended
> besides setting FirstPollOffsetStrategy to LATEST and asking for a recap?
>