You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/02 19:14:17 UTC

kafka git commit: MINOR: fix the logic of RocksDBWindowStore using RocksDBStore Segments

Repository: kafka
Updated Branches:
  refs/heads/trunk c97a75d98 -> 86a9036a7


MINOR: fix the logic of RocksDBWindowStore using RocksDBStore Segments

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Yasuhiro Matsuda

Closes #849 from guozhangwang/KRemoveInitializedCheck


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/86a9036a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/86a9036a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/86a9036a

Branch: refs/heads/trunk
Commit: 86a9036a7b03c8ae07d014c25a5eedc315544139
Parents: c97a75d
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Feb 2 10:14:13 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Feb 2 10:14:13 2016 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/state/internals/RocksDBStore.java  | 9 +++++++--
 .../kafka/streams/state/internals/RocksDBWindowStore.java   | 2 +-
 2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/86a9036a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index d7e229d..556e7cd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -129,11 +129,16 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         }
     }
 
-    @SuppressWarnings("unchecked")
-    public void init(ProcessorContext context) {
+    public void openDB(ProcessorContext context) {
         this.context = context;
         this.dbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), this.name);
         this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void init(ProcessorContext context) {
+        // first open the DB dir
+        openDB(context);
 
         this.changeLogger = this.loggingEnabled ? new RawStoreChangeLogger(name, context) : null;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86a9036a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index afb0f09..d6baf30 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -294,7 +294,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
 
         if (segments[index] == null) {
             segments[index] = new Segment(name + "-" + directorySuffix(segmentId), segmentId);
-            segments[index].init(context);
+            segments[index].openDB(context);
         }
 
         return segments[index];