You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/12/07 03:41:34 UTC

[kafka] branch 3.4 updated: KAFKA-14260: add `synchronized` to `prefixScan` method (#12893)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new f292e1960b9 KAFKA-14260: add `synchronized` to `prefixScan` method (#12893)
f292e1960b9 is described below

commit f292e1960b931922a7a4be19424fd1e4580d9264
Author: Lucia Cerchie <lu...@gmail.com>
AuthorDate: Tue Dec 6 20:39:32 2022 -0700

    KAFKA-14260: add `synchronized` to `prefixScan` method (#12893)
    
    As a result of "14260: InMemoryKeyValueStore iterator still throws ConcurrentModificationException", I'm adding synchronized to prefixScan as an alternative to going back to the ConcurrentSkipList.
    
    Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
 .../state/internals/InMemoryKeyValueStore.java     |  2 +-
 .../state/internals/AbstractKeyValueStoreTest.java | 23 +++++++++++++++++++++-
 2 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 98f377d0b24..7599bff82b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -169,7 +169,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
     }
 
     @Override
-    public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {
+    public synchronized <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {
 
         final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, prefix));
         final Bytes to = Bytes.increment(from);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 9e0fb306f28..4989b6cefb4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -55,7 +56,6 @@ import static org.junit.Assert.fail;
 public abstract class AbstractKeyValueStoreTest {
 
     protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context);
-
     protected InternalMockProcessorContext context;
     protected KeyValueStore<Integer, String> store;
     protected KeyValueStoreTestDriver<Integer, String> driver;
@@ -648,4 +648,25 @@ public abstract class AbstractKeyValueStoreTest {
             );
         }
     }
+
+    @Test
+    public void prefixScanShouldNotThrowConcurrentModificationException() {
+
+        store.put(0, "zero");
+        store.put(1, "one");
+        store.put(222, "two-hundred-twenty-two");
+        store.put(2, "two");
+        store.put(22, "twenty-two");
+        store.put(3, "three");
+
+        try (final KeyValueIterator<Integer, String> iter = store.prefixScan(2, new IntegerSerializer())) {
+
+            store.delete(22);
+
+            while (iter.hasNext()) {
+                iter.next();
+            }
+        }
+    }                  
 }
+