You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/12/07 03:41:34 UTC
[kafka] branch 3.4 updated: KAFKA-14260: add `synchronized` to `prefixScan` method (#12893)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push:
new f292e1960b9 KAFKA-14260: add `synchronized` to `prefixScan` method (#12893)
f292e1960b9 is described below
commit f292e1960b931922a7a4be19424fd1e4580d9264
Author: Lucia Cerchie <lu...@gmail.com>
AuthorDate: Tue Dec 6 20:39:32 2022 -0700
KAFKA-14260: add `synchronized` to `prefixScan` method (#12893)
As a result of "14260: InMemoryKeyValueStore iterator still throws ConcurrentModificationException", I'm adding synchronized to prefixScan as an alternative to going back to the ConcurrentSkipList.
Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
.../state/internals/InMemoryKeyValueStore.java | 2 +-
.../state/internals/AbstractKeyValueStoreTest.java | 23 +++++++++++++++++++++-
2 files changed, 23 insertions(+), 2 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
index 98f377d0b24..7599bff82b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
@@ -169,7 +169,7 @@ public class InMemoryKeyValueStore implements KeyValueStore<Bytes, byte[]> {
}
@Override
- public <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {
+ public synchronized <PS extends Serializer<P>, P> KeyValueIterator<Bytes, byte[]> prefixScan(final P prefix, final PS prefixKeySerializer) {
final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, prefix));
final Bytes to = Bytes.increment(from);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
index 9e0fb306f28..4989b6cefb4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
@@ -55,7 +56,6 @@ import static org.junit.Assert.fail;
public abstract class AbstractKeyValueStoreTest {
protected abstract <K, V> KeyValueStore<K, V> createKeyValueStore(final StateStoreContext context);
-
protected InternalMockProcessorContext context;
protected KeyValueStore<Integer, String> store;
protected KeyValueStoreTestDriver<Integer, String> driver;
@@ -648,4 +648,25 @@ public abstract class AbstractKeyValueStoreTest {
);
}
}
+
+ @Test
+ public void prefixScanShouldNotThrowConcurrentModificationException() {
+
+ store.put(0, "zero");
+ store.put(1, "one");
+ store.put(222, "two-hundred-twenty-two");
+ store.put(2, "two");
+ store.put(22, "twenty-two");
+ store.put(3, "three");
+
+ try (final KeyValueIterator<Integer, String> iter = store.prefixScan(2, new IntegerSerializer())) {
+
+ store.delete(22);
+
+ while (iter.hasNext()) {
+ iter.next();
+ }
+ }
+ }
}
+