You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Vasily Sulatskov <Va...@cfm.fr> on 2019/12/03 12:32:49 UTC

Ordering of messages in the same kafka streams sub-topology with multiple sinks for the same topic

Hello,

I wonder if ordering of the messages is preserved by kafka streams when the messages are processes by the same sub-topology without redistribution and in the end there are multiple sinks for the same topic. 

I couldn't find the answer to this question in the docs/mailing list/stack overflow.

You can arrive to this situation with the code like this:
 
val source = builder.stream[Key, Value]("input")
source
  .filter(...)
  .mapValues(...)
  .transform(...)
  .to("output")

source
  .filter(...)
  .mapValues(...)
  .transform(...)
  .to("output")

Basically it's two different processing branches, that process each input value slightly differently. I.e. if one branch produces a message, in response to an input message, the other branch will produce the message as well. So keeping the ordering in this case means, all messages produces for earlier source messages on one branch should precede messages produced by the other branch for later source messages.

Here's my topology:

  Sub-topology: 2
    Source: KSTREAM-SOURCE-0000000019 (topics: [input])
      --> KSTREAM-MAPVALUES-0000000020
    Processor: KSTREAM-MAPVALUES-0000000020 (stores: [])
      --> KSTREAM-MAPVALUES-0000000022, KSTREAM-TRANSFORM-0000000021
      <-- KSTREAM-SOURCE-0000000019
    Processor: KSTREAM-MAPVALUES-0000000022 (stores: [])
      --> KSTREAM-TRANSFORM-0000000023
      <-- KSTREAM-MAPVALUES-0000000020
    Processor: KSTREAM-TRANSFORM-0000000021 (stores: [store1])
      --> KSTREAM-MAP-0000000027
      <-- KSTREAM-MAPVALUES-0000000020
    Processor: KSTREAM-TRANSFORM-0000000023 (stores: [store2])
      --> KSTREAM-MAP-0000000024
      <-- KSTREAM-MAPVALUES-0000000022
    Processor: KSTREAM-MAP-0000000024 (stores: [])
      --> KSTREAM-FILTER-0000000025
      <-- KSTREAM-TRANSFORM-0000000023
    Processor: KSTREAM-MAP-0000000027 (stores: [])
      --> KSTREAM-FILTER-0000000028
      <-- KSTREAM-TRANSFORM-0000000021
    Processor: KSTREAM-FILTER-0000000025 (stores: [])
      --> KSTREAM-SINK-0000000026
      <-- KSTREAM-MAP-0000000024
    Processor: KSTREAM-FILTER-0000000028 (stores: [])
      --> KSTREAM-SINK-0000000029
      <-- KSTREAM-MAP-0000000027
    Sink: KSTREAM-SINK-0000000026 (topic: output)
      <-- KSTREAM-FILTER-0000000025
    Sink: KSTREAM-SINK-0000000029 (topic: output)
      <-- KSTREAM-FILTER-0000000028

On one hand I guess that it all information coming from one partition will be processed by one thread, so it can keep the order of the messages, but on the other hand I see two independent sinks in the topology, with independent buffers etc I guess. So in the end I am not sure what's going to happen.

I would guess that it can work because sinks probably have the same buffer size, but it's not guaranteed. I can imagine a following failure scenario: a write by one sink can succeed while the write by the other sink fails, so a batch of messages gets delivered to the output partition out of order. 

Can someone please clarify what happens in this case? Is there an ordering guarantee? Can this streams be merged while preserving ordering?

I know that regular Source.merge() doesn't preserve ordering, but in this case I know that there's no repartitioning etc, and messages basically appear on the same "tick", so it feels like there should be a way to do this. Can I keep ordering if I replace my transformers with processors and manually connect them to the same sink?


--
Best regards,
Vasily Sulatskov


Re: Ordering of messages in the same kafka streams sub-topology with multiple sinks for the same topic

Posted by John Roesler <vv...@apache.org>.
Hi Vasily,

Probably in this case, with the constraints you’re providing, the first branch would output first, but I wouldn’t depend on it. Any small change in your program could mess this up, and also any change in Streams could alter the exact execution order also. 

The right way to think about these programs is as “data flows”. You’re taking a stream of data and defining two separate branches into smaller streams, and then later on merging those back into one stream. In general, there would be no defined ordering, just like if you imagine doing the same thing with literal water streams. 

If you want a guarantee about the relative ordering, You’d have to use a specific operator that does what you want. If nothing else comes to mind, then a custom transformer or processor that gets records from both branches, and buffers records from the second so that it can emit the record from the first branch first would do the trick. 

Thanks,
John

On Tue, Dec 3, 2019, at 06:32, Vasily Sulatskov wrote:
> Hello,
> 
> I wonder if ordering of the messages is preserved by kafka streams when 
> the messages are processes by the same sub-topology without 
> redistribution and in the end there are multiple sinks for the same 
> topic. 
> 
> I couldn't find the answer to this question in the docs/mailing 
> list/stack overflow.
> 
> You can arrive to this situation with the code like this:
>  
> val source = builder.stream[Key, Value]("input")
> source
>   .filter(...)
>   .mapValues(...)
>   .transform(...)
>   .to("output")
> 
> source
>   .filter(...)
>   .mapValues(...)
>   .transform(...)
>   .to("output")
> 
> Basically it's two different processing branches, that process each 
> input value slightly differently. I.e. if one branch produces a 
> message, in response to an input message, the other branch will produce 
> the message as well. So keeping the ordering in this case means, all 
> messages produces for earlier source messages on one branch should 
> precede messages produced by the other branch for later source messages.
> 
> Here's my topology:
> 
>   Sub-topology: 2
>     Source: KSTREAM-SOURCE-0000000019 (topics: [input])
>       --> KSTREAM-MAPVALUES-0000000020
>     Processor: KSTREAM-MAPVALUES-0000000020 (stores: [])
>       --> KSTREAM-MAPVALUES-0000000022, KSTREAM-TRANSFORM-0000000021
>       <-- KSTREAM-SOURCE-0000000019
>     Processor: KSTREAM-MAPVALUES-0000000022 (stores: [])
>       --> KSTREAM-TRANSFORM-0000000023
>       <-- KSTREAM-MAPVALUES-0000000020
>     Processor: KSTREAM-TRANSFORM-0000000021 (stores: [store1])
>       --> KSTREAM-MAP-0000000027
>       <-- KSTREAM-MAPVALUES-0000000020
>     Processor: KSTREAM-TRANSFORM-0000000023 (stores: [store2])
>       --> KSTREAM-MAP-0000000024
>       <-- KSTREAM-MAPVALUES-0000000022
>     Processor: KSTREAM-MAP-0000000024 (stores: [])
>       --> KSTREAM-FILTER-0000000025
>       <-- KSTREAM-TRANSFORM-0000000023
>     Processor: KSTREAM-MAP-0000000027 (stores: [])
>       --> KSTREAM-FILTER-0000000028
>       <-- KSTREAM-TRANSFORM-0000000021
>     Processor: KSTREAM-FILTER-0000000025 (stores: [])
>       --> KSTREAM-SINK-0000000026
>       <-- KSTREAM-MAP-0000000024
>     Processor: KSTREAM-FILTER-0000000028 (stores: [])
>       --> KSTREAM-SINK-0000000029
>       <-- KSTREAM-MAP-0000000027
>     Sink: KSTREAM-SINK-0000000026 (topic: output)
>       <-- KSTREAM-FILTER-0000000025
>     Sink: KSTREAM-SINK-0000000029 (topic: output)
>       <-- KSTREAM-FILTER-0000000028
> 
> On one hand I guess that it all information coming from one partition 
> will be processed by one thread, so it can keep the order of the 
> messages, but on the other hand I see two independent sinks in the 
> topology, with independent buffers etc I guess. So in the end I am not 
> sure what's going to happen.
> 
> I would guess that it can work because sinks probably have the same 
> buffer size, but it's not guaranteed. I can imagine a following failure 
> scenario: a write by one sink can succeed while the write by the other 
> sink fails, so a batch of messages gets delivered to the output 
> partition out of order. 
> 
> Can someone please clarify what happens in this case? Is there an 
> ordering guarantee? Can this streams be merged while preserving 
> ordering?
> 
> I know that regular Source.merge() doesn't preserve ordering, but in 
> this case I know that there's no repartitioning etc, and messages 
> basically appear on the same "tick", so it feels like there should be a 
> way to do this. Can I keep ordering if I replace my transformers with 
> processors and manually connect them to the same sink?
> 
> 
> --
> Best regards,
> Vasily Sulatskov
> 
>