You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/13 00:04:01 UTC

kafka git commit: KAFKA-5198: Synchronize on RocksDbStore#openIterators since it is accessed from multiple threads

Repository: kafka
Updated Branches:
  refs/heads/0.10.2 767cc31a8 -> b9fc41f04


KAFKA-5198: Synchronize on RocksDbStore#openIterators since it is accessed from multiple threads

Author: Colin P. Mccabe <cm...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #3000 from cmccabe/KAFKA-5198

(cherry picked from commit b1cd3afc13070c4c50ba963520e7266f97ac52b4)
Signed-off-by: Guozhang Wang <wa...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b9fc41f0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b9fc41f0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b9fc41f0

Branch: refs/heads/0.10.2
Commit: b9fc41f04b0dc964896f95e41fcf882d734e0326
Parents: 767cc31
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Fri May 12 17:01:26 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri May 12 17:03:11 2017 -0700

----------------------------------------------------------------------
 .../kafka/streams/state/internals/RocksDBStore.java       | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b9fc41f0/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index a778cd8..71b5cdf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -45,6 +45,7 @@ import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteOptions;
 
 import java.io.File;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
@@ -80,7 +81,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
     private final String name;
     private final String parentDir;
-    private final Set<KeyValueIterator> openIterators = new HashSet<>();
+    private final Set<KeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<KeyValueIterator>());
 
     File dbDir;
     private StateSerdes<K, V> serdes;
@@ -379,10 +380,13 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     }
 
     private void closeOpenIterators() {
-        for (KeyValueIterator iterator : new HashSet<>(openIterators)) {
+        HashSet<KeyValueIterator> iterators = null;
+        synchronized (openIterators) {
+            iterators = new HashSet<>(openIterators);
+        }
+        for (KeyValueIterator iterator : iterators) {
             iterator.close();
         }
-        openIterators.clear();
     }