You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Boris Sukhinin <b....@gmail.com> on 2020/12/04 13:48:23 UTC

Re: Event-time skew when processing messages from different partitions

Matthias, thanks a lot for your reply and for the blog post. It took some time for me to wrap my head around Kafka Streams "continuous refinement" approach and stop trying to mimick what I've had in Flink.

Regards,
Boris.

On 2020/11/16 19:28:58, "Matthias J. Sax" <mj...@apache.org> wrote: 
> I guess you mean `map()` not `mapValues()` in your example, because a
> `mapValues()` would not result in data repartitioning.
> 
> > Should it be a real concern, especially when processing large amounts of historical data? 
> 
> It could be.
> 
> > Increasing the grace period doesn't seem like a valid option because it would delay emitting the final aggregation result.
> 
> Well, this is the trade-off you face, and there is no way around it.
> Also, if you process historic data, latency is usually not the biggest
> concern. And for the "live" use case when you process from the tail of
> the input topics, the unorder should be bounded.
> 
> > Apache Flink handles this by emitting the lowest watermark on partitions merge.
> 
> Yes, but Flink would also emit the final result with larger latency,
> because the watermark would arrive later if an upstream tasks lags. I
> guess it's fair to say that Flink would be more "adaptive" here and if
> there is no lag it would close earlier than Kafka Streams. -- Thus, for
> Kafka Streams you might want to have a larger grace period if you
> process history data and a shorter grace period for the "live" case.
> 
> > Does Kafka Streams offer something to deal with this scenario?
> 
> Not at the moment. As said above, you could set different grace period
> for different scenarios.
> 
> 
> Shameless plug: I recently discussed this topic in a podcast:
> https://developer.confluent.io/podcast/why-kafka-streams-does-not-use-watermarks-ft-matthias-j-sax
> 
> 
> Hope this helps.
> 
> -Matthias
> 
> 
> On 11/16/20 9:38 AM, Boris Sukhinin wrote:
> > Let me clarify the question a little bit.
> > 
> > Consider the following code:
> > stream
> >   .mapValues(...)
> >   .groupBy(...)
> >   .windowedBy(TimeWindows.of(Duration.ofSeconds(60)).grace(Duration.ofSeconds(10)))
> >   .aggregate(...)
> > 
> > I assume mapValues() operation could be slow for some tasks for whatever reason, and because of that tasks do process messages at a different pace. When a shuffle happens at the aggregate() operator, task 0 could have processed messages up to time T while task 1 is still at (T-skew), but messages from both tasks end up interleaved in a single partition of the internal topic (corresponding to the grouping key).
> > 
> > My concern is that when skew is large enough (more than 10 seconds in my example), messages from the lagging task 1 will be dropped.
> > 
> > Regards,
> > Boris
> > 
> > On 2020/11/16 10:33:10, ���������� �������������� <b....@gmail.com> wrote: 
> >> Hi All,
> >>
> >> Let's consider a topic with multiple partitions and messages written in event-time order without any particular partitioning scheme. Kafka Streams application does some transformations on these messages, then groups by some key, and then aggregates messages by an event-time window with the given grace period.
> >>
> >> Each task could process incoming messages at a different speed (e.g., because running on servers with different performance characteristics). This means that after groupBy shuffle, event-time ordering will not be preserved between messages in the same partition of the internal topic when they originate from different tasks. After a while, this event-time skew could become larger than the aggregation window size + grace period, which would lead to dropping messages originating from the lagging task.
> >>
> >> Should it be a real concern, especially when processing large amounts of historical data? Increasing the grace period doesn't seem like a valid option because it would delay emitting the final aggregation result. Apache Flink handles this by emitting the lowest watermark on partitions merge. Does Kafka Streams offer something to deal with this scenario?
> >>
> >> Regards,
> >> Boris
> >>
>