You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/18 02:46:45 UTC

[GitHub] [kafka] showuon opened a new pull request #11337: KAFKA-13309: fix InMemorySessionStore#fetch/backwardFetch order issue

showuon opened a new pull request #11337:
URL: https://github.com/apache/kafka/pull/11337


   JIRA is [here](https://issues.apache.org/jira/browse/KAFKA-13309).
   In https://github.com/apache/kafka/pull/9139, we added backward iterator on SessionStore. But there is a bug that  when fetch/backwardFetch the key range, if there are multiple records in the same session window, we can't return the data in the correct order.
   
   For example:
   We have a session window inactivity gap with 10 ms, and the records:
   
   key: "A", value: "AA", timestamp: 0 --> with SessionWindow(0, 0)
   key: "B", value: "BB", timestamp: 0 --> with SessionWindow(0, 0)
   key: "C", value: "CC", timestamp: 0 --> with SessionWindow(0, 0)
   key: "D" value: "DD", timestamp: 100 --> with SessionWindow(100, 100)
   
   So, when fetch("A" /*key from*/, "D" /*key to*/), we expected to have [A, B, C, D], but we'll have [C, B A, D ]
   
   And the reason is that we pass "false" in the "is forward" parameter for `fetch` method, and "true" for "backwardFetch" method, which obviously is wrong.
   
   So, why does the tests can't find this issue? 
   It's because the test data we provided doesn't have multiple data in the same session window. 
   
   In this PR, I fixed the issue, and add tests to improve the test coverage.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #11337: KAFKA-13309: fix InMemorySessionStore#fetch/backwardFetch order issue

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #11337:
URL: https://github.com/apache/kafka/pull/11337#issuecomment-929733880


   Merged to trunk and cherrypicked to 3.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #11337: KAFKA-13309: fix InMemorySessionStore#fetch/backwardFetch order issue

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #11337:
URL: https://github.com/apache/kafka/pull/11337#issuecomment-929733880


   Merged to trunk and cherrypicked to 3.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11337: KAFKA-13309: fix InMemorySessionStore#fetch/backwardFetch order issue

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11337:
URL: https://github.com/apache/kafka/pull/11337#discussion_r711430994



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
##########
@@ -195,18 +195,18 @@ public void shouldBackwardFetchAllSessionsWithSameRecordKey() {
         sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 5L);
 
         try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.backwardFetch("a")) {
-            assertEquals(new HashSet<>(expected), toSet(values));
+            assertEquals(toList(expected.descendingIterator()), toList(values));
         }
     }
 
     @Test
     public void shouldFetchAllSessionsWithinKeyRange() {
-        final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
-            KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L),
-            KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L),
-
-            KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L),
-            KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L));
+        final List<KeyValue<Windowed<String>, Long>> expected = new LinkedList<>();
+        expected.add(KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L));
+        expected.add(KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L));
+        expected.add(KeyValue.pair(new Windowed<>("aaaa", new SessionWindow(100, 100)), 6L));

Review comment:
       add a data with the same `SessionWindow(100, 100)`. So we can test if the fetch/backwardFetch can return data in correct order for both `InMemorySessionStore` and `RocksDBSessionStore`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #11337: KAFKA-13309: fix InMemorySessionStore#fetch/backwardFetch order issue

Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #11337:
URL: https://github.com/apache/kafka/pull/11337


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11337: KAFKA-13309: fix InMemorySessionStore#fetch/backwardFetch order issue

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11337:
URL: https://github.com/apache/kafka/pull/11337#discussion_r711430409



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
##########
@@ -292,7 +292,7 @@ public void remove(final Windowed<Bytes> sessionKey) {
         removeExpiredSegments();
 
         return registerNewIterator(
-            keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.descendingMap().entrySet().iterator(), true);
+            keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.descendingMap().entrySet().iterator(), false);

Review comment:
       fix: to make `backwardFetch` method passing `false` for `isFarwarded`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11337: KAFKA-13309: fix InMemorySessionStore#fetch/backwardFetch order issue

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11337:
URL: https://github.com/apache/kafka/pull/11337#issuecomment-927599696


   @jeqo @ableegoldman @vvcephei @guozhangwang  , please help review this PR when available. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11337: KAFKA-13309: fix InMemorySessionStore#fetch/backwardFetch order issue

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #11337:
URL: https://github.com/apache/kafka/pull/11337#issuecomment-922168687


   @jeqo @ableegoldman @vvcephei , please help review this PR. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11337: KAFKA-13309: fix InMemorySessionStore#fetch/backwardFetch order issue

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11337:
URL: https://github.com/apache/kafka/pull/11337#discussion_r711431126



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
##########
@@ -463,12 +469,20 @@ public void shouldFetchAndIterateOverExactBinaryKeys() {
         sessionStore.put(new Windowed<>(key2, new SessionWindow(8, 100)), "8");
         sessionStore.put(new Windowed<>(key3, new SessionWindow(9, 100)), "9");
 
-        final Set<String> expectedKey1 = new HashSet<>(asList("1", "4", "7"));
-        assertThat(valuesToSet(sessionStore.findSessions(key1, 0L, Long.MAX_VALUE)), equalTo(expectedKey1));
-        final Set<String> expectedKey2 = new HashSet<>(asList("2", "5", "8"));
-        assertThat(valuesToSet(sessionStore.findSessions(key2, 0L, Long.MAX_VALUE)), equalTo(expectedKey2));
-        final Set<String> expectedKey3 = new HashSet<>(asList("3", "6", "9"));
-        assertThat(valuesToSet(sessionStore.findSessions(key3, 0L, Long.MAX_VALUE)), equalTo(expectedKey3));
+        final List<String> expectedKey1 = asList("1", "4", "7");
+        try (KeyValueIterator<Windowed<Bytes>, String> iterator = sessionStore.findSessions(key1, 0L, Long.MAX_VALUE)) {
+            assertThat(valuesToSet(iterator), equalTo(new HashSet<>(expectedKey1)));
+        }

Review comment:
       We forgot to close the `KeyValueIterator` instance returned by the `findSessions`. Fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11337: KAFKA-13309: fix InMemorySessionStore#fetch/backwardFetch order issue

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11337:
URL: https://github.com/apache/kafka/pull/11337#discussion_r711430770



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
##########
@@ -121,15 +121,15 @@ public void shouldPutAndFindSessionsInRange() {
 
         try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.findSessions(key, 0, 1000L)
         ) {
-            assertEquals(new HashSet<>(expected), toSet(values));
+            assertEquals(expected, toList(values));

Review comment:
       before this PR, we only verify the returned data `contains` in the expected data as **Set**. Now, we'll change to **List**, which means, not only the data should be correct, but also the data order should be correct.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #11337: KAFKA-13309: fix InMemorySessionStore#fetch/backwardFetch order issue

Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #11337:
URL: https://github.com/apache/kafka/pull/11337


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11337: KAFKA-13309: fix InMemorySessionStore#fetch/backwardFetch order issue

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11337:
URL: https://github.com/apache/kafka/pull/11337#discussion_r711430994



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
##########
@@ -195,18 +195,18 @@ public void shouldBackwardFetchAllSessionsWithSameRecordKey() {
         sessionStore.put(new Windowed<>("aa", new SessionWindow(0, 0)), 5L);
 
         try (final KeyValueIterator<Windowed<String>, Long> values = sessionStore.backwardFetch("a")) {
-            assertEquals(new HashSet<>(expected), toSet(values));
+            assertEquals(toList(expected.descendingIterator()), toList(values));
         }
     }
 
     @Test
     public void shouldFetchAllSessionsWithinKeyRange() {
-        final List<KeyValue<Windowed<String>, Long>> expected = Arrays.asList(
-            KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L),
-            KeyValue.pair(new Windowed<>("b", new SessionWindow(1000, 1000)), 4L),
-
-            KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L),
-            KeyValue.pair(new Windowed<>("bb", new SessionWindow(1500, 2000)), 5L));
+        final List<KeyValue<Windowed<String>, Long>> expected = new LinkedList<>();
+        expected.add(KeyValue.pair(new Windowed<>("aa", new SessionWindow(10, 10)), 2L));
+        expected.add(KeyValue.pair(new Windowed<>("aaa", new SessionWindow(100, 100)), 3L));
+        expected.add(KeyValue.pair(new Windowed<>("aaaa", new SessionWindow(100, 100)), 6L));

Review comment:
       add a record with the same `SessionWindow(100, 100)`. So we can test if the fetch/backwardFetch can return data in correct order for both `InMemorySessionStore` and `RocksDBSessionStore`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11337: KAFKA-13309: fix InMemorySessionStore#fetch/backwardFetch order issue

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11337:
URL: https://github.com/apache/kafka/pull/11337#discussion_r711430400



##########
File path: streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java
##########
@@ -281,7 +281,7 @@ public void remove(final Windowed<Bytes> sessionKey) {
         removeExpiredSegments();
 
 
-        return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), false);
+        return registerNewIterator(keyFrom, keyTo, Long.MAX_VALUE, endTimeMap.entrySet().iterator(), true);

Review comment:
       fix: to make `fetch` method passing `true` for `isFarwarded`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org