You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/30 03:23:55 UTC

[GitHub] [kafka] showuon commented on a change in pull request #11367: MINOR: Do not copy on range for in-memory shared store in stream stream left/out joins

showuon commented on a change in pull request #11367:
URL: https://github.com/apache/kafka/pull/11367#discussion_r719028387



##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##########
@@ -311,7 +314,25 @@ private void assertUniqueStoreNames(final WindowBytesStoreSupplier supplier,
 
         final StoreBuilder<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> builder =
             new ListValueStoreBuilder<>(
-                persistent ? Stores.persistentKeyValueStore(storeName) : Stores.inMemoryKeyValueStore(storeName),
+                persistent ?
+                    Stores.persistentKeyValueStore(storeName) :
+                    new KeyValueBytesStoreSupplier() {
+                    @Override
+                    public String name() {
+                        return storeName;
+                    }
+
+                    @Override
+                    public KeyValueStore<Bytes, byte[]> get() {
+                        // do not copy of range since it would not be used for IQ

Review comment:
       are you trying to say: copy **on** range?

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ListValueStoreTest.java
##########
@@ -178,26 +181,46 @@ public void shouldGetAllReturnTimestampOrderedRecords() {
 
     @Test
     public void shouldAllowDeleteWhileIterateRecords() {
-        listStore.put(0, "zero1");
-        listStore.put(0, "zero2");
-        listStore.put(1, "one");
+        final Random rand = new Random();
+        int count = 0;
+        for (int i = 0; i < 10000; i++) {
+            listStore.put(i, "zero" + i);
+            count++;
+
+            while (rand.nextBoolean()) {
+                listStore.put(i, "zero" + i);
+                count++;
+            }
+        }
 
-        final KeyValue<Integer, String> zero1 = KeyValue.pair(0, "zero1");
-        final KeyValue<Integer, String> zero2 = KeyValue.pair(0, "zero2");
-        final KeyValue<Integer, String> one = KeyValue.pair(1, "one");
+        final int size = toList(listStore.all()).size();

Review comment:
       `listStore.all()` will return a `KeyValueIterator`, which should be explicitly closed. I think we can re-use the iterator below this line, which is closed at the end.

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ListValueStoreTest.java
##########
@@ -178,26 +181,46 @@ public void shouldGetAllReturnTimestampOrderedRecords() {
 
     @Test
     public void shouldAllowDeleteWhileIterateRecords() {
-        listStore.put(0, "zero1");
-        listStore.put(0, "zero2");
-        listStore.put(1, "one");
+        final Random rand = new Random();
+        int count = 0;
+        for (int i = 0; i < 10000; i++) {
+            listStore.put(i, "zero" + i);
+            count++;
+
+            while (rand.nextBoolean()) {
+                listStore.put(i, "zero" + i);
+                count++;
+            }
+        }
 
-        final KeyValue<Integer, String> zero1 = KeyValue.pair(0, "zero1");
-        final KeyValue<Integer, String> zero2 = KeyValue.pair(0, "zero2");
-        final KeyValue<Integer, String> one = KeyValue.pair(1, "one");
+        final int size = toList(listStore.all()).size();
+        assertEquals(count, size);
 
         final KeyValueIterator<Integer, String> it = listStore.all();
-        assertEquals(zero1, it.next());
 
-        listStore.put(0, null);
+        int prev = -1;
+        int deleted = 0;
+        int dupCount = 0;
+        while (it.hasNext()) {
+            final KeyValue<Integer, String> entry = it.next();
+
+            if (prev != -1 && prev != entry.key) {
+                if (rand.nextBoolean()) {
+                    listStore.put(prev, null);
+                    deleted += dupCount;
+                }
 
-        // zero2 should still be returned from the iterator after the delete call
-        assertEquals(zero2, it.next());
+                dupCount = 0;
+            }
+
+            dupCount++;
+            prev = entry.key;
+        }
 
         it.close();
 
         // A new all() iterator after a previous all() iterator was closed should not return deleted records.
-        assertEquals(Collections.singletonList(one), toList(listStore.all()));
+        assertEquals(size - deleted, toList(listStore.all()).size());

Review comment:
       as above mentioned, the `listStore.all()` is not closed here. 

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/ListValueStoreTest.java
##########
@@ -136,8 +138,9 @@ public void shouldGetAllNonDeletedRecords() {
         listStore.put(4, "four");
 
         // Delete some records
-        listStore.put(1, null);
-        listStore.put(3, null);
+        listStore.putIfAbsent(1, null);
+        listStore.putIfAbsent(3, null);
+        listStore.putIfAbsent(5, null);

Review comment:
       I guess the new added one is trying to delete a non-existed key. But I agree this might need to add a comment above it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org