You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Divya Goel <DG...@walmartlabs.com.INVALID> on 2019/06/13 20:52:43 UTC

How to process KStream windowed message parallelly

Hi,

I have the requirement to dedup messages within the window and take bunch of actions on the filtered message. I understand that we can get parallelism with the number of Kstream thread and can get maximum parallelism as number of partitions. But the actions that I take on the filtered message are various IO operations. As my applications is IO bound, I want to be able to execute multiple message after the window parallelly, and not sequentially.

I am using following code to start with. The forEach allows to execute messages sequentially. If I add submit Threads in foreach to thread pool, then I am not understanding how can I block committing the message until it is successfully executed. Some message executed in the window can fail as well. Please let me know, if you have any suggestions to process the windowed messages parallelly with Kstream.

builder.<String, String>stream(topic)
        .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofMillis(windowedTime)).advanceBy(Duration.ofMillis(windowedTime)))
        .reduce((value1, value2) -> value2, Materialized.as(reducerTopic))
        .toStream()
        .foreach((key, value) -> System.out.println(key + " => " + value));

Thanks,
Divya

Re: How to process KStream windowed message parallelly

Posted by "Matthias J. Sax" <ma...@confluent.io>.
There is no built-in support for this atm.

Async processing support as suggested via KIP-408 might help in the
future. But there in not much activity on this KIP atm.
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Processing+To+Kafka+Streams)

> If I add submit Threads in foreach to thread pool, then I am not understanding how can I block committing the message until it is successfully executed

Well. You can't block committing.

What you could try instead is, to use a `transform()` instead of
`foreach()` and attach a state store to transform. Each time you receive
an input message, you first put it into the store. If the message is
successfully processed, you remove if from the store. If you crash and
restart, you check the store for pending messages and retry processing them.


-Matthias


On 6/13/19 1:52 PM, Divya Goel wrote:
> Hi,
> 
> I have the requirement to dedup messages within the window and take bunch of actions on the filtered message. I understand that we can get parallelism with the number of Kstream thread and can get maximum parallelism as number of partitions. But the actions that I take on the filtered message are various IO operations. As my applications is IO bound, I want to be able to execute multiple message after the window parallelly, and not sequentially.
> 
> I am using following code to start with. The forEach allows to execute messages sequentially. If I add submit Threads in foreach to thread pool, then I am not understanding how can I block committing the message until it is successfully executed. Some message executed in the window can fail as well. Please let me know, if you have any suggestions to process the windowed messages parallelly with Kstream.
> 
> builder.<String, String>stream(topic)
>         .groupByKey()
>         .windowedBy(TimeWindows.of(Duration.ofMillis(windowedTime)).advanceBy(Duration.ofMillis(windowedTime)))
>         .reduce((value1, value2) -> value2, Materialized.as(reducerTopic))
>         .toStream()
>         .foreach((key, value) -> System.out.println(key + " => " + value));
> 
> Thanks,
> Divya
>