You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/30 21:24:48 UTC

[GitHub] [kafka] lihaosky opened a new pull request, #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window

lihaosky opened a new pull request, #12370:
URL: https://github.com/apache/kafka/pull/12370

   1. Added more unit test for `RocksDBTimeOrderedSessionStore` and `RocksDBTimeOrderedSessionSegmentedBytesStore `
   2. Disable cache for sliding window if emit strategy is `ON_WINDOW_CLOSE`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang merged pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window

Posted by GitBox <gi...@apache.org>.
guozhangwang merged PR #12370:
URL: https://github.com/apache/kafka/pull/12370


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12370:
URL: https://github.com/apache/kafka/pull/12370#discussion_r916291502


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java:
##########
@@ -55,6 +55,10 @@ <K, V> SessionStore<K, V> buildSessionStore(final long retentionPeriod,
             valueSerde).build();
     }
 
+    StoreType getStoreType() {

Review Comment:
   Just wondering, since we are leveraging on the inherited abstract session bytes store test, are there any test cases below that can be moved there or even are duplicates?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java:
##########
@@ -827,6 +827,139 @@ public void shouldPutAndBackwardFetchWithPrefix() {
         }
     }
 
+    @Test
+    public void shouldFetchSessionForSingleKey() {
+        // Only for TimeFirstSessionKeySchema schema
+        if (!(getBaseSchema() instanceof TimeFirstSessionKeySchema)) {
+            return;
+        }
+
+        final String keyA = "a";
+        final String keyB = "b";
+        final String keyC = "c";
+
+        final StateSerdes<String, Long> stateSerdes = StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
+        final Bytes key1 = Bytes.wrap(stateSerdes.keySerializer().serialize("dummy", keyA));
+        final Bytes key2 = Bytes.wrap(stateSerdes.keySerializer().serialize("dummy", keyB));
+        final Bytes key3 = Bytes.wrap(stateSerdes.keySerializer().serialize("dummy", keyC));
+
+        final byte[] expectedValue1 = serializeValue(10);
+        final byte[] expectedValue2 = serializeValue(50);
+        final byte[] expectedValue3 = serializeValue(100);
+        final byte[] expectedValue4 = serializeValue(200);
+
+        bytesStore.put(serializeKey(new Windowed<>(keyA, windows[0])), expectedValue1);
+        bytesStore.put(serializeKey(new Windowed<>(keyA, windows[1])), expectedValue2);
+        bytesStore.put(serializeKey(new Windowed<>(keyB, windows[2])), expectedValue3);
+        bytesStore.put(serializeKey(new Windowed<>(keyC, windows[3])), expectedValue4);
+
+        final byte[] value1 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession(

Review Comment:
   Could we also add a "miss" case for `fetchSession` here?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java:
##########
@@ -62,6 +57,11 @@ public static Collection<Object[]> getKeySchema() {
         });
     }
 
+    @Override
+    StoreType getStoreType() {

Review Comment:
   This is for line 52 above: should we rename that function to a more meaningful one? maybe `getParamStoreType`?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java:
##########
@@ -289,7 +289,10 @@ private <VR> StoreBuilder<SessionStore<K, VR>> materialize(final MaterializedInt
         // do not enable cache if the emit final strategy is used
         if (materialized.cachingEnabled() && emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
             builder.withCachingEnabled();
+        } else {
+            builder.withCachingDisabled();

Review Comment:
   We would only default at the Materialized level, so the if condition should be sufficient even if it's default to turn on caching, but nevertheless making it explicit seems fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12370:
URL: https://github.com/apache/kafka/pull/12370#discussion_r916385802


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java:
##########
@@ -55,6 +55,10 @@ <K, V> SessionStore<K, V> buildSessionStore(final long retentionPeriod,
             valueSerde).build();
     }
 
+    StoreType getStoreType() {

Review Comment:
   Sounds great!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lihaosky commented on a diff in pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window

Posted by GitBox <gi...@apache.org>.
lihaosky commented on code in PR #12370:
URL: https://github.com/apache/kafka/pull/12370#discussion_r916160977


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java:
##########
@@ -289,7 +289,10 @@ private <VR> StoreBuilder<SessionStore<K, VR>> materialize(final MaterializedInt
         // do not enable cache if the emit final strategy is used
         if (materialized.cachingEnabled() && emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
             builder.withCachingEnabled();
+        } else {
+            builder.withCachingDisabled();

Review Comment:
   Want to make it explicit in case default is caching enabled. I can remove it if you think it's unnecessary. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lihaosky commented on a diff in pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window

Posted by GitBox <gi...@apache.org>.
lihaosky commented on code in PR #12370:
URL: https://github.com/apache/kafka/pull/12370#discussion_r916381888


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java:
##########
@@ -55,6 +55,10 @@ <K, V> SessionStore<K, V> buildSessionStore(final long retentionPeriod,
             valueSerde).build();
     }
 
+    StoreType getStoreType() {

Review Comment:
   For `shouldRemoveExpired`, the RocksDB version has different params for session window.
   
   For `shouldNotExpireFromOpenIterator`, it's not supported in RocksDBSessionStore
   
   For `shouldMatchPositionAfterPut`, the RocksDB version has different cast than `InMemorySessionStore`.
   
   So we could put `shouldRemoveExpired` and `shouldMatchPositionAfterPut` in parent class and add different logic depending on store type. We can also leave it here. wdyt?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12370:
URL: https://github.com/apache/kafka/pull/12370#discussion_r915364409


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java:
##########
@@ -289,7 +289,10 @@ private <VR> StoreBuilder<SessionStore<K, VR>> materialize(final MaterializedInt
         // do not enable cache if the emit final strategy is used
         if (materialized.cachingEnabled() && emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
             builder.withCachingEnabled();
+        } else {
+            builder.withCachingDisabled();

Review Comment:
   Why do we need this call? I thought we only add tests in this PR? Is the feature not completed yet and we need this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on PR #12370:
URL: https://github.com/apache/kafka/pull/12370#issuecomment-1182129925

   LGTM! I'm merging to trunk now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12370:
URL: https://github.com/apache/kafka/pull/12370#discussion_r916256345


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java:
##########
@@ -289,7 +289,10 @@ private <VR> StoreBuilder<SessionStore<K, VR>> materialize(final MaterializedInt
         // do not enable cache if the emit final strategy is used
         if (materialized.cachingEnabled() && emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
             builder.withCachingEnabled();
+        } else {
+            builder.withCachingDisabled();

Review Comment:
   Maybe @guozhangwang can chime in?
   
   I am happy both ways I guess, but we should keep it consistent if possible. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on PR #12370:
URL: https://github.com/apache/kafka/pull/12370#issuecomment-1178363643

   @lihaosky Thanks for your PR, I made a pass on it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org