You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/02 19:14:17 UTC
kafka git commit: MINOR: fix the logic of RocksDBWindowStore using
RocksDBStore Segments
Repository: kafka
Updated Branches:
refs/heads/trunk c97a75d98 -> 86a9036a7
MINOR: fix the logic of RocksDBWindowStore using RocksDBStore Segments
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Yasuhiro Matsuda
Closes #849 from guozhangwang/KRemoveInitializedCheck
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/86a9036a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/86a9036a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/86a9036a
Branch: refs/heads/trunk
Commit: 86a9036a7b03c8ae07d014c25a5eedc315544139
Parents: c97a75d
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Feb 2 10:14:13 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Feb 2 10:14:13 2016 -0800
----------------------------------------------------------------------
.../apache/kafka/streams/state/internals/RocksDBStore.java | 9 +++++++--
.../kafka/streams/state/internals/RocksDBWindowStore.java | 2 +-
2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/86a9036a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index d7e229d..556e7cd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -129,11 +129,16 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
}
}
- @SuppressWarnings("unchecked")
- public void init(ProcessorContext context) {
+ public void openDB(ProcessorContext context) {
this.context = context;
this.dbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), this.name);
this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void init(ProcessorContext context) {
+ // first open the DB dir
+ openDB(context);
this.changeLogger = this.loggingEnabled ? new RawStoreChangeLogger(name, context) : null;
http://git-wip-us.apache.org/repos/asf/kafka/blob/86a9036a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index afb0f09..d6baf30 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -294,7 +294,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
if (segments[index] == null) {
segments[index] = new Segment(name + "-" + directorySuffix(segmentId), segmentId);
- segments[index].init(context);
+ segments[index].openDB(context);
}
return segments[index];