You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/13 00:01:29 UTC

kafka git commit: KAFKA-5198: Synchronize on RocksDbStore#openIterators

Repository: kafka
Updated Branches:
  refs/heads/trunk 7258a5fdd -> b1cd3afc1


KAFKA-5198: Synchronize on RocksDbStore#openIterators

…it is accessed from multiple threads

Author: Colin P. Mccabe <cm...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #3000 from cmccabe/KAFKA-5198


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b1cd3afc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b1cd3afc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b1cd3afc

Branch: refs/heads/trunk
Commit: b1cd3afc13070c4c50ba963520e7266f97ac52b4
Parents: 7258a5f
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Fri May 12 17:01:26 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri May 12 17:01:26 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/state/internals/RocksDBStore.java       | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b1cd3afc/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index da582b4..a01de77 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -47,6 +47,7 @@ import org.rocksdb.WriteOptions;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
@@ -82,7 +83,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
     private final String name;
     private final String parentDir;
-    private final Set<KeyValueIterator> openIterators = new HashSet<>();
+    private final Set<KeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<KeyValueIterator>());
 
     File dbDir;
     private StateSerdes<K, V> serdes;
@@ -386,10 +387,13 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     }
 
     private void closeOpenIterators() {
-        for (KeyValueIterator iterator : new HashSet<>(openIterators)) {
+        HashSet<KeyValueIterator> iterators = null;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        for (KeyValueIterator iterator : iterators) {
             iterator.close();
         }
-        openIterators.clear();
     }
 
     private class RocksDbIterator implements KeyValueIterator<K, V> {