You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/13 00:04:01 UTC
kafka git commit: KAFKA-5198: Synchronize on
RocksDbStore#openIterators since it is accessed from multiple threads
Repository: kafka
Updated Branches:
refs/heads/0.10.2 767cc31a8 -> b9fc41f04
KAFKA-5198: Synchronize on RocksDbStore#openIterators since it is accessed from multiple threads
Author: Colin P. Mccabe <cm...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #3000 from cmccabe/KAFKA-5198
(cherry picked from commit b1cd3afc13070c4c50ba963520e7266f97ac52b4)
Signed-off-by: Guozhang Wang <wa...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b9fc41f0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b9fc41f0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b9fc41f0
Branch: refs/heads/0.10.2
Commit: b9fc41f04b0dc964896f95e41fcf882d734e0326
Parents: 767cc31
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Fri May 12 17:01:26 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri May 12 17:03:11 2017 -0700
----------------------------------------------------------------------
.../kafka/streams/state/internals/RocksDBStore.java | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b9fc41f0/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index a778cd8..71b5cdf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -45,6 +45,7 @@ import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;
import java.io.File;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
@@ -80,7 +81,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private final String name;
private final String parentDir;
- private final Set<KeyValueIterator> openIterators = new HashSet<>();
+ private final Set<KeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<KeyValueIterator>());
File dbDir;
private StateSerdes<K, V> serdes;
@@ -379,10 +380,13 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
private void closeOpenIterators() {
- for (KeyValueIterator iterator : new HashSet<>(openIterators)) {
+ HashSet<KeyValueIterator> iterators = null;
+ synchronized (openIterators) {
+ iterators = new HashSet<>(openIterators);
+ }
+ for (KeyValueIterator iterator : iterators) {
iterator.close();
}
- openIterators.clear();
}