You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Luke Chen (Jira)" <ji...@apache.org> on 2021/06/29 06:48:00 UTC
[jira] [Updated] (KAFKA-13008) Stream will stop processing data for
a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Luke Chen updated KAFKA-13008:
------------------------------
Description:
In KIP-695, we improved the task idling mechanism by checking partition lag. It's a good improvement for timestamp sync. But I found it will cause the stream stop processing the data for a long time while waiting for the partition metadata.
I've been investigating this case for a while, and figuring out the issue will happen in below situation (or similar situation):
# start 2 streams (each with 1 thread) to consume from a topicA (with 3 partitions: A-0, A-1, A-2)
# After 2 streams started, the partitions assignment are: (I skipped some other processing related partitions for simplicity)
stream1-thread1: A-0, A-1
stream2-thread1: A-2
# start processing some data, assume now, the position and high watermark is:
A-0: offset: 2, highWM: 2
A-1: offset: 2, highWM: 2
A-2: offset: 2, highWM: 2
# Now, stream3 joined, so trigger rebalance with this assignment:
stream1-thread1: A-0
stream2-thread1: A-2
stream3-thread1: A-1
# Suddenly, stream3 left, so now, rebalance again, with the step 2 assignment:
stream1-thread1: A-0, *A-1*
stream2-thread1: A-2
# Now, note that, the partition A-1 used to get assigned to stream1-thread1, and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 record per 30 mins), and partition A-0 has fast input (ex: 10K records / sec). So, now, the stream1-thread1 won't process any data until we got input from partition A-1 (even if partition A-0 is buffered a lot, and we have `{{max.task.idle.ms}}` set to 0).
The reason why the stream1-thread1 won't process any data is because we can't get the metadata of partition A-1. And why we can't get the metadata? It's because
# In KIP-695, we use consumer's cache to get the partition lag, to avoid remote call
# The lag for a partition will be cleared if the assignment in this round doesn't have this partition. check here. So, in the above example, the metadata cache for partition A-1 will be cleared in step 4, and re-initialized (to null) in step 5
# In KIP-227, we introduced a fetch session to have incremental fetch request/response. That is, if the session existed, the client(consumer) will get the update only when the fetched partition have update (ex: new data). So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 mins), it won't have update until next 30 mins, or wait for the fetch session become inactive for (default) 2 mins to be evicted. Either case, the metadata won't be updated for a while.
In KIP-695, if we don't get the partition lag, we can't determine the partition data status to do timestamp sync, so we'll keep waiting and not processing any data. That's why this issue will happen.
*Proposed solution:*
# If we don't get the current lag for a partition, or the current lag > 0, we start to wait for max.task.idle.ms, and reset the deadline when we get the partition lag, like what we did in previous KIP-353
# Introduce a waiting time config when no partition lag, or partition lag keeps > 0 (need KIP)
[~vvcephei] [~guozhang] , any suggestions?
cc [~ableegoldman] [~mjsax] , this is the root cause that in [https://github.com/apache/kafka/pull/10736,] we discussed and thought there's a data lose situation. FYI.
was:
In KIP-695, we improved the task idling mechanism by checking partition lag. It's a good improvement for timestamp sync. But I found it will cause the stream stop processing the data for a long time while waiting for the partition metadata.
I've been investigating this case for a while, and figuring out the issue will happen in below situation (or similar situation):
# start 2 streams (each with 1 thread) to consume from a topicA (with 3 partitions: A-0, A-1, A-2)
# After 2 streams started, the partitions assignment are: (I skipped some other processing related partitions for simplicity)
stream1-thread1: A-0, A-1
stream2-thread1: A-2
# start processing some data, assume now, the position and high watermark is:
A-0: offset: 2, highWM: 2
A-1: offset: 2, highWM: 2
A-2: offset: 2, highWM: 2
# Now, stream3 joined, so trigger rebalance with this assignment:
stream1-thread1: A-0
stream2-thread1: A-2
stream3-thread1: A-1
# Suddenly, stream3 left, so now, rebalance again, with the step 2 assignment:
stream1-thread1: A-0, *A-1*
stream2-thread1: A-2
# Now, note that, the partition A-1 used to get assigned to stream1-thread1, and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 record per 30 mins), and partition A-0 has fast input (ex: 10K records / sec). So, now, the stream1-thread1 won't process any data until we got input from partition A-1 (even if partition A-0 is buffered a lot, and we have `{{max.task.idle.ms}}` set to 0).
The reason why the stream1-thread1 won't process any data is because we can't get the metadata of partition A-1. And why we can't get the metadata? It's because
# In KIP-695, we use consumer's cache to get the partition lag, to avoid remote call
# The lag for a partition will be cleared if the assignment in this round doesn't have this partition. check here. So, in the above example, the metadata cache for partition A-1 will be cleared in step 4, and re-initialized (to null) in step 5
# In KIP-227, we introduced a fetch session to have incremental fetch request/response. That is, if the session existed, the client(consumer) will get the update only when the fetched partition have update (ex: new data). So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 mins), it won't have update until next 30 mins, or wait for the fetch session become inactive for (default) 2 mins to be evicted. Either case, the metadata won't be updated for a while.
In KIP-695, if we don't get the partition lag, we can't determine the partition data status to do timestamp sync, so we'll keep waiting and not processing any data. That's why this issue will happen.
*Proposed solution:*
# If we don't get the current lag for a partition, or the current lag > 0, we start to wait for max.task.idle.ms, and reset the deadline when we get the partition lag, like what we did in previous KIP-353
# Introduce a waiting time config when no partition lag, or partition lag keeps > 0 (need KIP)
[~vvcephei] [~guozhang] , any suggestions?
cc [~ableegoldman] [~mjsax] , this is the root cause that in [https://github.com/apache/kafka/pull/10736,] we discussed and thought there's a data lose situation. FYI.
> Stream will stop processing data for a long time while waiting for the partition lag
> ------------------------------------------------------------------------------------
>
> Key: KAFKA-13008
> URL: https://issues.apache.org/jira/browse/KAFKA-13008
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 3.0.0
> Reporter: Luke Chen
> Priority: Major
>
> In KIP-695, we improved the task idling mechanism by checking partition lag. It's a good improvement for timestamp sync. But I found it will cause the stream stop processing the data for a long time while waiting for the partition metadata.
>
> I've been investigating this case for a while, and figuring out the issue will happen in below situation (or similar situation):
> # start 2 streams (each with 1 thread) to consume from a topicA (with 3 partitions: A-0, A-1, A-2)
> # After 2 streams started, the partitions assignment are: (I skipped some other processing related partitions for simplicity)
> stream1-thread1: A-0, A-1
> stream2-thread1: A-2
> # start processing some data, assume now, the position and high watermark is:
> A-0: offset: 2, highWM: 2
> A-1: offset: 2, highWM: 2
> A-2: offset: 2, highWM: 2
> # Now, stream3 joined, so trigger rebalance with this assignment:
> stream1-thread1: A-0
> stream2-thread1: A-2
> stream3-thread1: A-1
> # Suddenly, stream3 left, so now, rebalance again, with the step 2 assignment:
> stream1-thread1: A-0, *A-1*
> stream2-thread1: A-2
> # Now, note that, the partition A-1 used to get assigned to stream1-thread1, and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 record per 30 mins), and partition A-0 has fast input (ex: 10K records / sec). So, now, the stream1-thread1 won't process any data until we got input from partition A-1 (even if partition A-0 is buffered a lot, and we have `{{max.task.idle.ms}}` set to 0).
>
> The reason why the stream1-thread1 won't process any data is because we can't get the metadata of partition A-1. And why we can't get the metadata? It's because
> # In KIP-695, we use consumer's cache to get the partition lag, to avoid remote call
> # The lag for a partition will be cleared if the assignment in this round doesn't have this partition. check here. So, in the above example, the metadata cache for partition A-1 will be cleared in step 4, and re-initialized (to null) in step 5
> # In KIP-227, we introduced a fetch session to have incremental fetch request/response. That is, if the session existed, the client(consumer) will get the update only when the fetched partition have update (ex: new data). So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 mins), it won't have update until next 30 mins, or wait for the fetch session become inactive for (default) 2 mins to be evicted. Either case, the metadata won't be updated for a while.
>
> In KIP-695, if we don't get the partition lag, we can't determine the partition data status to do timestamp sync, so we'll keep waiting and not processing any data. That's why this issue will happen.
>
> *Proposed solution:*
> # If we don't get the current lag for a partition, or the current lag > 0, we start to wait for max.task.idle.ms, and reset the deadline when we get the partition lag, like what we did in previous KIP-353
> # Introduce a waiting time config when no partition lag, or partition lag keeps > 0 (need KIP)
> [~vvcephei] [~guozhang] , any suggestions?
>
> cc [~ableegoldman] [~mjsax] , this is the root cause that in [https://github.com/apache/kafka/pull/10736,] we discussed and thought there's a data lose situation. FYI.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)