You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/13 00:01:29 UTC
kafka git commit: KAFKA-5198: Synchronize on
RocksDbStore#openIterators
Repository: kafka
Updated Branches:
refs/heads/trunk 7258a5fdd -> b1cd3afc1
KAFKA-5198: Synchronize on RocksDbStore#openIterators
…it is accessed from multiple threads
Author: Colin P. Mccabe <cm...@confluent.io>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #3000 from cmccabe/KAFKA-5198
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b1cd3afc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b1cd3afc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b1cd3afc
Branch: refs/heads/trunk
Commit: b1cd3afc13070c4c50ba963520e7266f97ac52b4
Parents: 7258a5f
Author: Colin P. Mccabe <cm...@confluent.io>
Authored: Fri May 12 17:01:26 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri May 12 17:01:26 2017 -0700
----------------------------------------------------------------------
.../kafka/streams/state/internals/RocksDBStore.java | 10 +++++++---
1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1cd3afc/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index da582b4..a01de77 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -47,6 +47,7 @@ import org.rocksdb.WriteOptions;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
@@ -82,7 +83,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
private final String name;
private final String parentDir;
- private final Set<KeyValueIterator> openIterators = new HashSet<>();
+ private final Set<KeyValueIterator> openIterators = Collections.synchronizedSet(new HashSet<KeyValueIterator>());
File dbDir;
private StateSerdes<K, V> serdes;
@@ -386,10 +387,13 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
private void closeOpenIterators() {
- for (KeyValueIterator iterator : new HashSet<>(openIterators)) {
+ HashSet<KeyValueIterator> iterators = null;
+ synchronized (openIterators) {
+ iterators = new HashSet<>(openIterators);
+ }
+ for (KeyValueIterator iterator : iterators) {
iterator.close();
}
- openIterators.clear();
}
private class RocksDbIterator implements KeyValueIterator<K, V> {