You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/07 23:23:24 UTC

[GitHub] [kafka] guozhangwang commented on a diff in pull request #12370: KAFKA-13785: [10/N][emit final] more unit test for session store and disable cache for emit final sliding window

guozhangwang commented on code in PR #12370:
URL: https://github.com/apache/kafka/pull/12370#discussion_r916291502


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/InMemorySessionStoreTest.java:
##########
@@ -55,6 +55,10 @@ <K, V> SessionStore<K, V> buildSessionStore(final long retentionPeriod,
             valueSerde).build();
     }
 
+    StoreType getStoreType() {

Review Comment:
   Just wondering, since we are leveraging on the inherited abstract session bytes store test, are there any test cases below that can be moved there or even are duplicates?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.java:
##########
@@ -827,6 +827,139 @@ public void shouldPutAndBackwardFetchWithPrefix() {
         }
     }
 
+    @Test
+    public void shouldFetchSessionForSingleKey() {
+        // Only for TimeFirstSessionKeySchema schema
+        if (!(getBaseSchema() instanceof TimeFirstSessionKeySchema)) {
+            return;
+        }
+
+        final String keyA = "a";
+        final String keyB = "b";
+        final String keyC = "c";
+
+        final StateSerdes<String, Long> stateSerdes = StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
+        final Bytes key1 = Bytes.wrap(stateSerdes.keySerializer().serialize("dummy", keyA));
+        final Bytes key2 = Bytes.wrap(stateSerdes.keySerializer().serialize("dummy", keyB));
+        final Bytes key3 = Bytes.wrap(stateSerdes.keySerializer().serialize("dummy", keyC));
+
+        final byte[] expectedValue1 = serializeValue(10);
+        final byte[] expectedValue2 = serializeValue(50);
+        final byte[] expectedValue3 = serializeValue(100);
+        final byte[] expectedValue4 = serializeValue(200);
+
+        bytesStore.put(serializeKey(new Windowed<>(keyA, windows[0])), expectedValue1);
+        bytesStore.put(serializeKey(new Windowed<>(keyA, windows[1])), expectedValue2);
+        bytesStore.put(serializeKey(new Windowed<>(keyB, windows[2])), expectedValue3);
+        bytesStore.put(serializeKey(new Windowed<>(keyC, windows[3])), expectedValue4);
+
+        final byte[] value1 = ((RocksDBTimeOrderedSessionSegmentedBytesStore) bytesStore).fetchSession(

Review Comment:
   Could we also add a "miss" case for `fetchSession` here?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.java:
##########
@@ -62,6 +57,11 @@ public static Collection<Object[]> getKeySchema() {
         });
     }
 
+    @Override
+    StoreType getStoreType() {

Review Comment:
   This is for line 52 above: should we rename that function to a more meaningful one? maybe `getParamStoreType`?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java:
##########
@@ -289,7 +289,10 @@ private <VR> StoreBuilder<SessionStore<K, VR>> materialize(final MaterializedInt
         // do not enable cache if the emit final strategy is used
         if (materialized.cachingEnabled() && emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
             builder.withCachingEnabled();
+        } else {
+            builder.withCachingDisabled();

Review Comment:
   We would only default at the Materialized level, so the if condition should be sufficient even if it's default to turn on caching, but nevertheless making it explicit seems fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org