You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Luke Chen (Jira)" <ji...@apache.org> on 2021/09/17 12:34:00 UTC
[jira] [Created] (KAFKA-13309)
InMemorySessionStore#fetch/backwardFetch doesn't return in correct order
Luke Chen created KAFKA-13309:
---------------------------------
Summary: InMemorySessionStore#fetch/backwardFetch doesn't return in correct order
Key: KAFKA-13309
URL: https://issues.apache.org/jira/browse/KAFKA-13309
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 3.0.0
Reporter: Luke Chen
Assignee: Luke Chen
We supported backward iterator for SessionStore in KAFKA-9929. But we cannot return the correct order when fetch/backwardFetch the key range when there are multiple records in the same session window.
For example:
We have a session window inactivity gap with 10 ms, and the records:
key: "A", value: "AA", timestamp: 0 --> with SessionWindow(0, 0)
key: "B", value: "BB", timestamp: 0 --> with SessionWindow(0, 0)
key: "C", value: "CC", timestamp: 0 --> with SessionWindow(0, 0)
key: "D" value: "DD", timestamp: 100 --> with SessionWindow(100, 100)
So, when fetch("A" /*key from*/, "D" /*key to*/), we expected to have [A, B, C, D\], but we'll have [C, B A, D \]
And the reason is here:
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java#L276-L295]
{code:java}
public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(final Bytes keyFrom, final Bytes keyTo) {
return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), false); // <-- the final param is "isFarwarded", which should be true for "fetch" case, and false for "backwardFetch" case
}
{code}
We pass "false" in the "is forward" parameter for `fetch` method, and "true" for "backwardFetch" method, which obviously is wrong.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)