You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Virgil Palanciuc <vi...@adobe.com.INVALID> on 2018/06/04 14:58:11 UTC

Enriching stream of events

Hi,

I’m trying to “enrich” a stream of events (i.e. roughly speaking read messages from one topic, make a query to an external system, write to another topic).  The problem is – external system can handle lots of calls, but has a high latency. 
I can easily do the enrichment if I use one thread per message, but that's kinda' wasteful (I'd need a lot of partitions/ tasks to have reasonable throughput).... so, I'm thinking about using multiplexed IO. Like, have a processor that just registers the "input message"  in a local state store, without committing the task; and then, the punctuator can look at the registered "input messages", start the IO for all of them, forward the results for the completed IO tasks to subsequent processors, and commit progress. The state store can be non-replicated, since I can reprocess messages in case of failure (and I don't really mind duplication of messages on the output topic)

The question is... would that work? I'm concerned about rebalancing (when one worker is added/removed), and specifically:
- If I understand correctly the code, during partition rebalance the tasks may be suspended, and the suspend() method will actually commit the offsets (the last task where process() was completed, regardless whether it invoked commit() or not). That'd be bad, since it means that on rebalance, I might end up skipping records (I'm not concerned about duplication, within reasonable bounds; but I am concerned about skipping records)
- Not sure what happens to the state during rebalancing (if I disable the changelog - i.e. I make all the state local). Is all state lost? 

Thanks,
Virgil.


Re: Enriching stream of events

Posted by Guozhang Wang <wa...@gmail.com>.
Hello Virgil,

Your use case is a common scenario for some multi-thread async processing,
and your concerns about states missing during commit offset are valid: if
changelog is disabled, and if the task gets migrated to another instance
during rebalance, that task will resume with an empty state; if the task
happen to stay at the same instance, then it will resume with whatever data
that is in the state store.

As of today I think one workaround would be still enabling changelog topic
for your state store. So that after the rebalance any records that have not
been processed will still be in the store, and once they are completed
processing we can then remove them from the store.



I've also filed https://issues.apache.org/jira/browse/KAFKA-6989 for a
general feature request, please feel free to share your feedbacks on that.

Guozhang

On Mon, Jun 4, 2018 at 7:58 AM, Virgil Palanciuc <vi...@adobe.com.invalid>
wrote:

> Hi,
>
> I’m trying to “enrich” a stream of events (i.e. roughly speaking read
> messages from one topic, make a query to an external system, write to
> another topic).  The problem is – external system can handle lots of calls,
> but has a high latency.
> I can easily do the enrichment if I use one thread per message, but that's
> kinda' wasteful (I'd need a lot of partitions/ tasks to have reasonable
> throughput).... so, I'm thinking about using multiplexed IO. Like, have a
> processor that just registers the "input message"  in a local state store,
> without committing the task; and then, the punctuator can look at the
> registered "input messages", start the IO for all of them, forward the
> results for the completed IO tasks to subsequent processors, and commit
> progress. The state store can be non-replicated, since I can reprocess
> messages in case of failure (and I don't really mind duplication of
> messages on the output topic)
>
> The question is... would that work? I'm concerned about rebalancing (when
> one worker is added/removed), and specifically:
> - If I understand correctly the code, during partition rebalance the tasks
> may be suspended, and the suspend() method will actually commit the offsets
> (the last task where process() was completed, regardless whether it invoked
> commit() or not). That'd be bad, since it means that on rebalance, I might
> end up skipping records (I'm not concerned about duplication, within
> reasonable bounds; but I am concerned about skipping records)
> - Not sure what happens to the state during rebalancing (if I disable the
> changelog - i.e. I make all the state local). Is all state lost?
>
> Thanks,
> Virgil.
>
>


-- 
-- Guozhang