You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/02/11 19:36:11 UTC

kafka git commit: KAFKA-3229 ensure that root statestore is registered with ProcessorStateManager

Repository: kafka
Updated Branches:
  refs/heads/trunk 67a7ea9d6 -> 330274ed1


KAFKA-3229 ensure that root statestore is registered with ProcessorStateManager

Pass through the root StateStore in the init method so the inner StateStore can register that object.

Author: tomdearman <to...@gmail.com>

Reviewers: Yasuhiro Matsuda

Closes #904 from tomdearman/KAFKA-3229


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/330274ed
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/330274ed
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/330274ed

Branch: refs/heads/trunk
Commit: 330274ed1c8efd2b1aa9907860429d9d20f72c3c
Parents: 67a7ea9
Author: tomdearman <to...@gmail.com>
Authored: Thu Feb 11 11:35:55 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Thu Feb 11 11:35:55 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/streams/processor/StateStore.java     | 2 +-
 .../apache/kafka/streams/processor/internals/AbstractTask.java  | 2 +-
 .../streams/state/internals/InMemoryKeyValueLoggedStore.java    | 5 +++--
 .../streams/state/internals/InMemoryKeyValueStoreSupplier.java  | 4 ++--
 .../apache/kafka/streams/state/internals/MemoryLRUCache.java    | 5 +++--
 .../kafka/streams/state/internals/MeteredKeyValueStore.java     | 5 +++--
 .../kafka/streams/state/internals/MeteredWindowStore.java       | 5 +++--
 .../org/apache/kafka/streams/state/internals/RocksDBStore.java  | 5 +++--
 .../kafka/streams/state/internals/RocksDBWindowStore.java       | 5 +++--
 .../streams/state/internals/InMemoryKeyValueStoreTest.java      | 2 +-
 .../streams/state/internals/InMemoryLRUCacheStoreTest.java      | 2 +-
 .../kafka/streams/state/internals/RocksDBKeyValueStoreTest.java | 2 +-
 .../kafka/streams/state/internals/RocksDBWindowStoreTest.java   | 2 +-
 .../src/test/java/org/apache/kafka/test/KStreamTestDriver.java  | 2 +-
 .../test/java/org/apache/kafka/test/MockStateStoreSupplier.java | 4 ++--
 15 files changed, 29 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index 9c085a5..b07e510 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -38,7 +38,7 @@ public interface StateStore {
     /**
      * Initializes this state store
      */
-    void init(ProcessorContext context);
+    void init(ProcessorContext context, StateStore root);
 
     /**
      * Flush any cached data

http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 162a926..3f7140a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -71,7 +71,7 @@ public abstract class AbstractTask {
     protected void initializeStateStores() {
         for (StateStoreSupplier stateStoreSupplier : this.topology.stateStoreSuppliers()) {
             StateStore store = stateStoreSupplier.get();
-            store.init(this.processorContext);
+            store.init(this.processorContext, store);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
index 94349bf..596cc2b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Serdes;
@@ -46,10 +47,10 @@ public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public void init(ProcessorContext context) {
+    public void init(ProcessorContext context, StateStore root) {
         this.changeLogger = new StoreChangeLogger<>(storeName, context, serdes);
 
-        inner.init(context);
+        inner.init(context, root);
 
         this.getter = new StoreChangeLogger.ValueGetter<K, V>() {
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index 03290c1..0665af2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -88,9 +88,9 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
         }
 
         @Override
-        public void init(ProcessorContext context) {
+        public void init(ProcessorContext context, StateStore root) {
             if (loggingEnabled) {
-                context.register(this, true, new StateRestoreCallback() {
+                context.register(root, true, new StateRestoreCallback() {
 
                     @Override
                     public void restore(byte[] key, byte[] value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
index aaa1efd..2a8be8c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Serdes;
@@ -87,9 +88,9 @@ public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public void init(ProcessorContext context) {
+    public void init(ProcessorContext context, StateStore root) {
         if (loggingEnabled) {
-            context.register(this, true, new StateRestoreCallback() {
+            context.register(root, true, new StateRestoreCallback() {
 
                 @Override
                 public void restore(byte[] key, byte[] value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index fd308c3..46feb58 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 
@@ -64,7 +65,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @Override
-    public void init(ProcessorContext context) {
+    public void init(ProcessorContext context, StateStore root) {
         final String name = name();
         this.metrics = context.metrics();
         this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
@@ -79,7 +80,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         // register and possibly restore the state from the logs
         long startNs = time.nanoseconds();
         try {
-            inner.init(context);
+            inner.init(context, root);
         } finally {
             this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 33f4c88..37ae499 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
@@ -51,7 +52,7 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
     }
 
     @Override
-    public void init(ProcessorContext context) {
+    public void init(ProcessorContext context, StateStore root) {
         final String name = name();
         this.metrics = context.metrics();
         this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
@@ -62,7 +63,7 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
         // register and possibly restore the state from the logs
         long startNs = time.nanoseconds();
         try {
-            inner.init(context);
+            inner.init(context, root);
         } finally {
             this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 11bf96e..999c9ec 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -21,6 +21,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Serdes;
@@ -142,7 +143,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     }
 
     @SuppressWarnings("unchecked")
-    public void init(ProcessorContext context) {
+    public void init(ProcessorContext context, StateStore root) {
         // first open the DB dir
         openDB(context);
 
@@ -176,7 +177,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
             }
         };
 
-        context.register(this, loggingEnabled, new StateRestoreCallback() {
+        context.register(root, loggingEnabled, new StateRestoreCallback() {
 
             @Override
             public void restore(byte[] key, byte[] value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index 2758e6e..b1605a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.Serdes;
 import org.apache.kafka.streams.state.WindowStore;
@@ -158,7 +159,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
     }
 
     @Override
-    public void init(ProcessorContext context) {
+    public void init(ProcessorContext context, StateStore root) {
         this.context = context;
 
         openExistingSegments();
@@ -167,7 +168,7 @@ public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
                 new RawStoreChangeLogger(name, context) : null;
 
         // register and possibly restore the state from the logs
-        context.register(this, loggingEnabled, new StateRestoreCallback() {
+        context.register(root, loggingEnabled, new StateRestoreCallback() {
             @Override
             public void restore(byte[] key, byte[] value) {
                 putInternal(key, value);

http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
index 2b0927e..46948bd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreTest.java
@@ -44,7 +44,7 @@ public class InMemoryKeyValueStoreTest extends AbstractKeyValueStoreTest {
         }
 
         KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
-        store.init(context);
+        store.init(context, store);
         return store;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
index 10f31d6..a2b79e5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreTest.java
@@ -50,7 +50,7 @@ public class InMemoryLRUCacheStoreTest extends AbstractKeyValueStoreTest {
         }
 
         KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
-        store.init(context);
+        store.init(context, store);
         return store;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
index b9703db..8e8f69c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreTest.java
@@ -45,7 +45,7 @@ public class RocksDBKeyValueStoreTest extends AbstractKeyValueStoreTest {
         }
 
         KeyValueStore<K, V> store = (KeyValueStore<K, V>) supplier.get();
-        store.init(context);
+        store.init(context, store);
         return store;
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index fd55944..5a196ec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -65,7 +65,7 @@ public class RocksDBWindowStoreTest {
         StateStoreSupplier supplier = new RocksDBWindowStoreSupplier<>(windowName, retentionPeriod, numSegments, true, serdes, null);
 
         WindowStore<K, V> store = (WindowStore<K, V>) supplier.get();
-        store.init(context);
+        store.init(context, store);
         return store;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 2dc567e..c0c5c39 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -59,7 +59,7 @@ public class KStreamTestDriver {
 
         for (StateStoreSupplier stateStoreSupplier : topology.stateStoreSuppliers()) {
             StateStore store = stateStoreSupplier.get();
-            store.init(context);
+            store.init(context, store);
         }
 
         for (ProcessorNode node : topology.processors()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/330274ed/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
index 73d446f..7b31477 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockStateStoreSupplier.java
@@ -81,8 +81,8 @@ public class MockStateStoreSupplier implements StateStoreSupplier {
         }
 
         @Override
-        public void init(ProcessorContext context) {
-            context.register(this, loggingEnabled, stateRestoreCallback);
+        public void init(ProcessorContext context, StateStore root) {
+            context.register(root, loggingEnabled, stateRestoreCallback);
             initialized = true;
         }