You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/02 01:11:28 UTC

[2/2] kafka git commit: KAFKA-3060: Refactor MeteredStore and RockDBStore Impl

KAFKA-3060: Refactor MeteredStore and RockDBStore Impl

Changes include:

1) Move logging logic from MeteredXXXStore to internal stores, and leave WindowedStore API clean by removed all internalPut/Get functions.

2) Wrap common logging behavior of InMemory and LRUCache stores into one class.

3) Fix a bug for StoreChangeLogger where byte arrays are not comparable in HashSet by using a specified RawStoreChangeLogger.

4) Add a caching layer on top of RocksDBStore with object caching, it relies on the object's equals and hashCode function to be consistent with serdes.

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Yasuhiro Matsuda <ya...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #826 from guozhangwang/K3060


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/57da044a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/57da044a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/57da044a

Branch: refs/heads/trunk
Commit: 57da044a991ebf8913d44dfcfa6a27729f54a4d5
Parents: 66ecf3f
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Feb 1 16:11:13 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Feb 1 16:11:13 2016 -0800

----------------------------------------------------------------------
 .../examples/pageview/PageViewTypedJob.java     |   6 +-
 .../examples/pageview/PageViewUnTypedJob.java   |   6 +-
 .../kafka/streams/examples/pipe/PipeJob.java    |   4 +-
 .../examples/wordcount/WordCountJob.java        |   4 +-
 .../org/apache/kafka/streams/StreamsConfig.java |   1 +
 .../streams/kstream/internals/KStreamImpl.java  |  76 +++----
 .../streams/kstream/internals/KTableImpl.java   |   4 +-
 .../kstream/internals/KTableStoreSupplier.java  |   2 +-
 .../org/apache/kafka/streams/state/Stores.java  |  36 ++-
 .../apache/kafka/streams/state/WindowStore.java |   7 -
 .../kafka/streams/state/WindowStoreUtil.java    |  55 -----
 .../kafka/streams/state/WindowStoreUtils.java   |  54 +++++
 .../internals/InMemoryKeyValueLoggedStore.java  | 132 +++++++++++
 .../InMemoryKeyValueStoreSupplier.java          |  28 ++-
 .../InMemoryLRUCacheStoreSupplier.java          | 151 +------------
 .../streams/state/internals/MemoryLRUCache.java | 151 +++++++++++++
 .../internals/MemoryNavigableLRUCache.java      | 103 +++++++++
 .../state/internals/MeteredKeyValueStore.java   |  79 +------
 .../state/internals/MeteredWindowStore.java     |  68 +-----
 .../state/internals/OffsetCheckpoint.java       |   8 +-
 .../state/internals/RawStoreChangeLogger.java   |  56 +++++
 .../internals/RocksDBKeyValueStoreSupplier.java |   5 +-
 .../streams/state/internals/RocksDBStore.java   | 221 +++++++++++++++++--
 .../state/internals/RocksDBWindowStore.java     |  80 +++++--
 .../internals/RocksDBWindowStoreSupplier.java   |   4 +-
 .../state/internals/StoreChangeLogger.java      |  47 +++-
 .../internals/ProcessorTopologyTest.java        |   4 +-
 .../streams/state/KeyValueStoreTestDriver.java  |  54 +++--
 .../kafka/streams/state/StateTestUtils.java     |  79 +++++++
 .../apache/kafka/streams/state/StateUtils.java  |  79 -------
 .../internals/AbstractKeyValueStoreTest.java    |   2 +-
 .../internals/InMemoryLRUCacheStoreTest.java    | 197 +++++++----------
 .../state/internals/OffsetCheckpointTest.java   |  61 +++++
 .../internals/RocksDBKeyValueStoreTest.java     |   4 +-
 .../state/internals/RocksDBWindowStoreTest.java |   8 +-
 .../state/internals/StoreChangeLoggerTest.java  | 146 ++++++++++++
 .../apache/kafka/test/MockProcessorContext.java |  11 +
 .../kafka/test/MockStateStoreSupplier.java      |  10 +-
 38 files changed, 1358 insertions(+), 685 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
index c064848..7f11512 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Count;
 import org.apache.kafka.streams.kstream.HoppingWindows;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -33,6 +32,7 @@ import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.StreamsConfig;
 
 import java.util.Properties;
 
@@ -121,7 +121,7 @@ public class PageViewTypedJob {
         // write to the result topic
         regionCount.to("streams-pageviewstats-output", new JsonPOJOSerializer<>(), new JsonPOJOSerializer<>());
 
-        KafkaStreams kstream = new KafkaStreams(builder, props);
-        kstream.start();
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.start();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
index 1ae02c9..013332e 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java
@@ -53,8 +53,6 @@ public class PageViewUnTypedJob {
         props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
 
-        StreamsConfig config = new StreamsConfig(props);
-
         KStreamBuilder builder = new KStreamBuilder();
 
         final Serializer<String> stringSerializer = new StringSerializer();
@@ -101,7 +99,7 @@ public class PageViewUnTypedJob {
         // write to the result topic
         regionCount.to("streams-pageviewstats-output");
 
-        KafkaStreams kstream = new KafkaStreams(builder, config);
-        kstream.start();
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.start();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
index 4a4f97f..841f37b 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java
@@ -44,7 +44,7 @@ public class PipeJob {
 
         builder.stream("streams-file-input").to("streams-pipe-output");
 
-        KafkaStreams kstream = new KafkaStreams(builder, props);
-        kstream.start();
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.start();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
----------------------------------------------------------------------
diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
index 8aa15a4..c66e965 100644
--- a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
+++ b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java
@@ -97,7 +97,7 @@ public class WordCountJob {
 
         counts.to("streams-wordcount-output", stringSerializer, JsonSerializer);
 
-        KafkaStreams kstream = new KafkaStreams(builder, props);
-        kstream.start();
+        KafkaStreams streams = new KafkaStreams(builder, props);
+        streams.start();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 16bb06a..041d0e9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -157,6 +157,7 @@ public class StreamsConfig extends AbstractConfig {
                                         ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC)
                                 .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
                                         Type.CLASS,
+                                        WALLCLOCK_TIMESTAMP_EXTRACTOR,
                                         Importance.MEDIUM,
                                         TIMESTAMP_EXTRACTOR_CLASS_DOC)
                                 .define(PARTITION_GROUPER_CLASS_CONFIG,

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 7ebc28c..73f7266 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -36,9 +36,9 @@ import org.apache.kafka.streams.kstream.Window;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
-import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.Stores;
 
 import java.lang.reflect.Array;
 import java.util.HashSet;
@@ -304,23 +304,19 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
 
-        RocksDBWindowStoreSupplier<K, V> thisWindow =
-                new RocksDBWindowStoreSupplier<>(
-                        windows.name() + "-this",
-                        windows.maintainMs(),
-                        windows.segments,
-                        true,
-                        new Serdes<>("", keySerializer, keyDeserializer, thisValueSerializer, thisValueDeserializer),
-                        null);
-
-        RocksDBWindowStoreSupplier<K, V1> otherWindow =
-                new RocksDBWindowStoreSupplier<>(
-                        windows.name() + "-other",
-                        windows.maintainMs(),
-                        windows.segments,
-                        true,
-                        new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer),
-                        null);
+        StateStoreSupplier thisWindow = Stores.create(windows.name() + "-this")
+                .withKeys(keySerializer, keyDeserializer)
+                .withValues(otherValueSerializer, otherValueDeserializer)
+                .persistent()
+                .windowed(windows.maintainMs(), windows.segments, true)
+                .build();
+
+        StateStoreSupplier otherWindow = Stores.create(windows.name() + "-other")
+                .withKeys(keySerializer, keyDeserializer)
+                .withValues(otherValueSerializer, otherValueDeserializer)
+                .persistent()
+                .windowed(windows.maintainMs(), windows.segments, true)
+                .build();
 
         KStreamJoinWindow<K, V> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
         KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
@@ -360,14 +356,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other);
 
-        RocksDBWindowStoreSupplier<K, V1> otherWindow =
-                new RocksDBWindowStoreSupplier<>(
-                        windows.name() + "-this",
-                        windows.maintainMs(),
-                        windows.segments,
-                        true,
-                        new Serdes<>("", keySerializer, keyDeserializer, otherValueSerializer, otherValueDeserializer),
-                        null);
+        StateStoreSupplier otherWindow = Stores.create(windows.name() + "-other")
+                .withKeys(keySerializer, keyDeserializer)
+                .withValues(otherValueSerializer, otherValueDeserializer)
+                .persistent()
+                .windowed(windows.maintainMs(), windows.segments, true)
+                .build();
 
         KStreamJoinWindow<K, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs());
         KStreamKStreamJoin<K, R, V, V1> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, true);
@@ -410,14 +404,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
         ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamReduce<>(windows, windows.name(), reducer);
 
-        RocksDBWindowStoreSupplier<K, V> aggregateStore =
-                new RocksDBWindowStoreSupplier<>(
-                        windows.name(),
-                        windows.maintainMs(),
-                        windows.segments,
-                        false,
-                        new Serdes<>("", keySerializer, keyDeserializer, aggValueSerializer, aggValueDeserializer),
-                        null);
+        StateStoreSupplier aggregateStore = Stores.create(windows.name())
+                .withKeys(keySerializer, keyDeserializer)
+                .withValues(aggValueSerializer, aggValueDeserializer)
+                .persistent()
+                .windowed(windows.maintainMs(), windows.segments, false)
+                .build();
 
         // aggregate the values with the aggregator and local store
         topology.addProcessor(selectName, aggWindowSupplier, this.name);
@@ -444,14 +436,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         ProcessorSupplier<K, V> aggWindowSupplier = new KStreamAggWindow<>();
         ProcessorSupplier<Windowed<K>, Change<V>> aggregateSupplier = new KStreamAggregate<>(windows, windows.name(), aggregator);
 
-        RocksDBWindowStoreSupplier<K, T> aggregateStore =
-                new RocksDBWindowStoreSupplier<>(
-                        windows.name(),
-                        windows.maintainMs(),
-                        windows.segments,
-                        false,
-                        new Serdes<>("", keySerializer, keyDeserializer, aggValueSerializer, aggValueDeserializer),
-                        null);
+        StateStoreSupplier aggregateStore = Stores.create(windows.name())
+                .withKeys(keySerializer, keyDeserializer)
+                .withValues(aggValueSerializer, aggValueDeserializer)
+                .persistent()
+                .windowed(windows.maintainMs(), windows.segments, false)
+                .build();
 
         // aggregate the values with the aggregator and local store
         topology.addProcessor(selectName, aggWindowSupplier, this.name);

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index d046090..4398e3f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -273,7 +273,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         StateStoreSupplier aggregateStore = Stores.create(name)
                 .withKeys(keySerializer, keyDeserializer)
                 .withValues(aggValueSerializer, aggValueDeserializer)
-                .localDatabase()
+                .persistent()
                 .build();
 
         // select the aggregate key and values (old and new), it would require parent to send old values
@@ -322,7 +322,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
         StateStoreSupplier aggregateStore = Stores.create(name)
                 .withKeys(keySerializer, keyDeserializer)
                 .withValues(valueSerializer, valueDeserializer)
-                .localDatabase()
+                .persistent()
                 .build();
 
         // select the aggregate key and values (old and new), it would require parent to send old values

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
index c993512..ffd5cf0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableStoreSupplier.java
@@ -52,7 +52,7 @@ public class KTableStoreSupplier<K, V> implements StateStoreSupplier {
     }
 
     public StateStore get() {
-        return new MeteredKeyValueStore<>(new RocksDBStore<>(name, serdes), serdes, "rocksdb-state", time).disableLogging();
+        return new MeteredKeyValueStore<>(new RocksDBStore<>(name, serdes), "rocksdb-state", time);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 46b2592..e9d82bc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier;
 import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier;
 import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
+import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier;
 
 /**
  * Factory for creating key-value stores.
@@ -76,10 +77,27 @@ public class Stores {
                             }
 
                             @Override
-                            public LocalDatabaseKeyValueFactory<K, V> localDatabase() {
-                                return new LocalDatabaseKeyValueFactory<K, V>() {
+                            public PersistentKeyValueFactory<K, V> persistent() {
+                                return new PersistentKeyValueFactory<K, V>() {
+                                    private int numSegments = 0;
+                                    private long retentionPeriod = 0L;
+                                    private boolean retainDuplicates = false;
+
+                                    @Override
+                                    public PersistentKeyValueFactory<K, V> windowed(long retentionPeriod, int numSegments, boolean retainDuplicates) {
+                                        this.numSegments = numSegments;
+                                        this.retentionPeriod = retentionPeriod;
+                                        this.retainDuplicates = retainDuplicates;
+
+                                        return this;
+                                    }
+
                                     @Override
                                     public StateStoreSupplier build() {
+                                        if (numSegments > 0) {
+                                            return new RocksDBWindowStoreSupplier<>(name, retentionPeriod, numSegments, retainDuplicates, serdes, null);
+                                        }
+
                                         return new RocksDBKeyValueStoreSupplier<>(name, serdes, null);
                                     }
                                 };
@@ -237,7 +255,7 @@ public class Stores {
          *
          * @return the factory to create in-memory key-value stores; never null
          */
-        LocalDatabaseKeyValueFactory<K, V> localDatabase();
+        PersistentKeyValueFactory<K, V> persistent();
     }
 
     /**
@@ -270,7 +288,17 @@ public class Stores {
      * @param <K> the type of keys
      * @param <V> the type of values
      */
-    public static interface LocalDatabaseKeyValueFactory<K, V> {
+    public static interface PersistentKeyValueFactory<K, V> {
+
+        /**
+         * Set the persistent store as a windowed key-value store
+         *
+         * @param retentionPeriod the maximum period of time in milli-second to keep each window in this store
+         * @param numSegments the maximum number of segments for rolling the windowed store
+         * @param retainDuplicates whether or not to retain duplicate data within the window
+         */
+        PersistentKeyValueFactory<K, V> windowed(long retentionPeriod, int numSegments, boolean retainDuplicates);
+
         /**
          * Return the instance of StateStoreSupplier of new key-value store.
          * @return the key-value store; never null

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index b17d889..1d806e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -27,12 +27,5 @@ public interface WindowStore<K, V> extends StateStore {
 
     void put(K key, V value, long timestamp);
 
-    byte[] putAndReturnInternalKey(K key, V value, long timestamp);
-
     WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo);
-
-    void putInternal(byte[] binaryKey, byte[] binaryValue);
-
-    byte[] getInternal(byte[] binaryKey);
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtil.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtil.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtil.java
deleted file mode 100644
index b11a206..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtil.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import java.nio.ByteBuffer;
-
-public class WindowStoreUtil<K, V> {
-
-    public static final int TIMESTAMP_SIZE = 8;
-    public static final int SEQNUM_SIZE = 4;
-    public static final Serdes<byte[], byte[]> INNER_SERDES = Serdes.withBuiltinTypes("", byte[].class, byte[].class);
-    @SuppressWarnings("unchecked")
-    public static final KeyValueIterator<byte[], byte[]>[] NO_ITERATORS = (KeyValueIterator<byte[], byte[]>[]) new KeyValueIterator[0];
-
-    public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum, Serdes<K, ?> serdes) {
-        byte[] serializedKey = serdes.rawKey(key);
-
-        ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + SEQNUM_SIZE);
-        buf.put(serializedKey);
-        buf.putLong(timestamp);
-        buf.putInt(seqnum);
-
-        return buf.array();
-    }
-
-    public static <K> K keyFromBinaryKey(byte[] binaryKey, Serdes<K, ?> serdes) {
-        byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE];
-
-        System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
-
-        return serdes.keyFrom(bytes);
-    }
-
-    public static long timestampFromBinaryKey(byte[] binaryKey) {
-        return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
new file mode 100644
index 0000000..3a3d585
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreUtils.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.streams.state;
+
+import java.nio.ByteBuffer;
+
+public class WindowStoreUtils<K, V> {
+
+    public static final int TIMESTAMP_SIZE = 8;
+    public static final int SEQNUM_SIZE = 4;
+    public static final Serdes<byte[], byte[]> INNER_SERDES = Serdes.withBuiltinTypes("", byte[].class, byte[].class);
+    @SuppressWarnings("unchecked")
+    public static final KeyValueIterator<byte[], byte[]>[] NO_ITERATORS = (KeyValueIterator<byte[], byte[]>[]) new KeyValueIterator[0];
+
+    public static <K> byte[] toBinaryKey(K key, final long timestamp, final int seqnum, Serdes<K, ?> serdes) {
+        byte[] serializedKey = serdes.rawKey(key);
+
+        ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + TIMESTAMP_SIZE + SEQNUM_SIZE);
+        buf.put(serializedKey);
+        buf.putLong(timestamp);
+        buf.putInt(seqnum);
+
+        return buf.array();
+    }
+
+    public static <K> K keyFromBinaryKey(byte[] binaryKey, Serdes<K, ?> serdes) {
+        byte[] bytes = new byte[binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE];
+
+        System.arraycopy(binaryKey, 0, bytes, 0, bytes.length);
+
+        return serdes.keyFrom(bytes);
+    }
+
+    public static long timestampFromBinaryKey(byte[] binaryKey) {
+        return ByteBuffer.wrap(binaryKey).getLong(binaryKey.length - TIMESTAMP_SIZE - SEQNUM_SIZE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
new file mode 100644
index 0000000..5be6483
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueLoggedStore.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Serdes;
+
+import java.util.List;
+
+public class InMemoryKeyValueLoggedStore<K, V> implements KeyValueStore<K, V> {
+
+    private final KeyValueStore<K, V> inner;
+    private final Serdes<K, V> serdes;
+    private final String topic;
+
+    private StoreChangeLogger<K, V> changeLogger;
+    private StoreChangeLogger.ValueGetter<K, V> getter;
+
+    public InMemoryKeyValueLoggedStore(final String topic, final KeyValueStore<K, V> inner, final Serdes<K, V> serdes) {
+        this.topic = topic;
+        this.inner = inner;
+        this.serdes = serdes;
+    }
+
+    @Override
+    public String name() {
+        return this.topic;
+    }
+
+    @Override
+    public void init(ProcessorContext context) {
+        this.changeLogger = new StoreChangeLogger<>(topic, context, serdes);
+
+        inner.init(context);
+
+        this.getter = new StoreChangeLogger.ValueGetter<K, V>() {
+            @Override
+            public V get(K key) {
+                return inner.get(key);
+            }
+        };
+    }
+
+    @Override
+    public boolean persistent() {
+        return inner.persistent();
+    }
+
+    @Override
+    public V get(K key) {
+        return this.inner.get(key);
+    }
+
+    @Override
+    public void put(K key, V value) {
+        this.inner.put(key, value);
+
+        changeLogger.add(key);
+        changeLogger.maybeLogChange(this.getter);
+    }
+
+    @Override
+    public void putAll(List<KeyValue<K, V>> entries) {
+        this.inner.putAll(entries);
+
+        for (KeyValue<K, V> entry : entries) {
+            K key = entry.key;
+            changeLogger.add(key);
+        }
+        changeLogger.maybeLogChange(this.getter);
+    }
+
+    @Override
+    public V delete(K key) {
+        V value = this.inner.delete(key);
+
+        removed(key);
+
+        return value;
+    }
+
+    /**
+     * Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this
+     * store.
+     *
+     * @param key the key for the entry that the inner store removed
+     */
+    protected void removed(K key) {
+        changeLogger.delete(key);
+        changeLogger.maybeLogChange(this.getter);
+    }
+
+    @Override
+    public KeyValueIterator<K, V> range(K from, K to) {
+        return this.inner.range(from, to);
+    }
+
+    @Override
+    public KeyValueIterator<K, V> all() {
+        return this.inner.all();
+    }
+
+    @Override
+    public void close() {
+        inner.close();
+    }
+
+    @Override
+    public void flush() {
+        this.inner.flush();
+
+        changeLogger.logChange(getter);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
index 4856b09..03290c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -20,6 +20,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueIterator;
@@ -43,13 +44,13 @@ import java.util.TreeMap;
 public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
 
     private final String name;
-    private final Serdes serdes;
     private final Time time;
+    private final Serdes<K, V> serdes;
 
     public InMemoryKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
         this.name = name;
-        this.serdes = serdes;
         this.time = time;
+        this.serdes = serdes;
     }
 
     public String name() {
@@ -57,7 +58,7 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
     }
 
     public StateStore get() {
-        return new MeteredKeyValueStore<K, V>(new MemoryStore<K, V>(name), serdes, "in-memory-state", time);
+        return new MeteredKeyValueStore<>(new MemoryStore<K, V>(name).enableLogging(serdes), "in-memory-state", time);
     }
 
     private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
@@ -65,12 +66,22 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
         private final String name;
         private final NavigableMap<K, V> map;
 
+        private boolean loggingEnabled = false;
+        private Serdes<K, V> serdes = null;
+
         public MemoryStore(String name) {
             super();
             this.name = name;
             this.map = new TreeMap<>();
         }
 
+        public KeyValueStore<K, V> enableLogging(Serdes<K, V> serdes) {
+            this.loggingEnabled = true;
+            this.serdes = serdes;
+
+            return new InMemoryKeyValueLoggedStore<>(this.name, this, serdes);
+        }
+
         @Override
         public String name() {
             return this.name;
@@ -78,7 +89,16 @@ public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
 
         @Override
         public void init(ProcessorContext context) {
-            // do-nothing since it is in-memory
+            if (loggingEnabled) {
+                context.register(this, true, new StateRestoreCallback() {
+
+                    @Override
+                    public void restore(byte[] key, byte[] value) {
+                        put(serdes.keyFrom(key), serdes.valueFrom(value));
+                    }
+                });
+
+            }
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
index 22ee3f7..9b7936a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
@@ -17,21 +17,10 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Serdes;
 
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.TreeSet;
-
 /**
  * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
  *
@@ -43,7 +32,7 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
 
     private final String name;
     private final int capacity;
-    private final Serdes serdes;
+    private final Serdes<K, V> serdes;
     private final Time time;
 
     public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serdes<K, V> serdes, Time time) {
@@ -57,143 +46,17 @@ public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
         return name;
     }
 
+    @SuppressWarnings("unchecked")
     public StateStore get() {
-        MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity);
-        final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(cache, serdes, "in-memory-lru-state", time);
-        cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() {
+        final MemoryNavigableLRUCache<K, V> cache = new MemoryNavigableLRUCache<K, V>(name, capacity);
+        final InMemoryKeyValueLoggedStore<K, V> loggedCache = (InMemoryKeyValueLoggedStore) cache.enableLogging(serdes);
+        final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(loggedCache, "in-memory-lru-state", time);
+        cache.whenEldestRemoved(new MemoryNavigableLRUCache.EldestEntryRemovalListener<K, V>() {
             @Override
             public void apply(K key, V value) {
-                store.removed(key);
+                loggedCache.removed(key);
             }
         });
         return store;
     }
-
-    private static interface EldestEntryRemovalListener<K, V> {
-        public void apply(K key, V value);
-    }
-
-    protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
-
-        private final String name;
-        private final Map<K, V> map;
-        private final NavigableSet<K> keys;
-        private EldestEntryRemovalListener<K, V> listener;
-
-        public MemoryLRUCache(String name, final int maxCacheSize) {
-            this.name = name;
-            this.keys = new TreeSet<>();
-            // leave room for one extra entry to handle adding an entry before the oldest can be removed
-            this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
-                private static final long serialVersionUID = 1L;
-
-                @Override
-                protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
-                    if (size() > maxCacheSize) {
-                        K key = eldest.getKey();
-                        keys.remove(key);
-                        if (listener != null) listener.apply(key, eldest.getValue());
-                        return true;
-                    }
-                    return false;
-                }
-            };
-        }
-
-        protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
-            this.listener = listener;
-        }
-
-        @Override
-        public String name() {
-            return this.name;
-        }
-
-        @Override
-        public void init(ProcessorContext context) {
-            // do-nothing since it is in-memory
-        }
-
-        @Override
-        public boolean persistent() {
-            return false;
-        }
-
-        @Override
-        public V get(K key) {
-            return this.map.get(key);
-        }
-
-        @Override
-        public void put(K key, V value) {
-            this.map.put(key, value);
-            this.keys.add(key);
-        }
-
-        @Override
-        public void putAll(List<KeyValue<K, V>> entries) {
-            for (KeyValue<K, V> entry : entries)
-                put(entry.key, entry.value);
-        }
-
-        @Override
-        public V delete(K key) {
-            V value = this.map.remove(key);
-            this.keys.remove(key);
-            return value;
-        }
-
-        @Override
-        public KeyValueIterator<K, V> range(K from, K to) {
-            return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map);
-        }
-
-        @Override
-        public KeyValueIterator<K, V> all() {
-            return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
-        }
-
-        @Override
-        public void flush() {
-            // do-nothing since it is in-memory
-        }
-
-        @Override
-        public void close() {
-            // do-nothing
-        }
-
-        private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
-            private final Iterator<K> keys;
-            private final Map<K, V> entries;
-            private K lastKey;
-
-            public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
-                this.keys = keys;
-                this.entries = entries;
-            }
-
-            @Override
-            public boolean hasNext() {
-                return keys.hasNext();
-            }
-
-            @Override
-            public KeyValue<K, V> next() {
-                lastKey = keys.next();
-                return new KeyValue<>(lastKey, entries.get(lastKey));
-            }
-
-            @Override
-            public void remove() {
-                keys.remove();
-                entries.remove(lastKey);
-            }
-
-            @Override
-            public void close() {
-                // do nothing
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
new file mode 100644
index 0000000..aaa1efd
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryLRUCache.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Serdes;
+
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
+
+    public interface EldestEntryRemovalListener<K, V> {
+        void apply(K key, V value);
+    }
+
+    protected String name;
+    protected Map<K, V> map;
+    protected Set<K> keys;
+
+    protected EldestEntryRemovalListener<K, V> listener;
+
+    private boolean loggingEnabled = false;
+    private Serdes<K, V> serdes = null;
+
+    // this is used for extended MemoryNavigableLRUCache only
+    public MemoryLRUCache() {}
+
+    public MemoryLRUCache(String name, final int maxCacheSize) {
+        this.name = name;
+        this.keys = new HashSet<>();
+
+        // leave room for one extra entry to handle adding an entry before the oldest can be removed
+        this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+                if (size() > maxCacheSize) {
+                    K key = eldest.getKey();
+                    keys.remove(key);
+                    if (listener != null) listener.apply(key, eldest.getValue());
+                    return true;
+                }
+                return false;
+            }
+        };
+    }
+
+    public KeyValueStore<K, V> enableLogging(Serdes<K, V> serdes) {
+        this.loggingEnabled = true;
+        this.serdes = serdes;
+
+        return new InMemoryKeyValueLoggedStore<>(this.name, this, serdes);
+    }
+
+    public MemoryLRUCache<K, V> whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
+        this.listener = listener;
+
+        return this;
+    }
+
+    @Override
+    public String name() {
+        return this.name;
+    }
+
+    @Override
+    public void init(ProcessorContext context) {
+        if (loggingEnabled) {
+            context.register(this, true, new StateRestoreCallback() {
+
+                @Override
+                public void restore(byte[] key, byte[] value) {
+                    put(serdes.keyFrom(key), serdes.valueFrom(value));
+                }
+            });
+
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return false;
+    }
+
+    @Override
+    public V get(K key) {
+        return this.map.get(key);
+    }
+
+    @Override
+    public void put(K key, V value) {
+        this.map.put(key, value);
+        this.keys.add(key);
+    }
+
+    @Override
+    public void putAll(List<KeyValue<K, V>> entries) {
+        for (KeyValue<K, V> entry : entries)
+            put(entry.key, entry.value);
+    }
+
+    @Override
+    public V delete(K key) {
+        V value = this.map.remove(key);
+        this.keys.remove(key);
+        return value;
+    }
+
+    @Override
+    public KeyValueIterator<K, V> range(K from, K to) {
+        throw new UnsupportedOperationException("MemoryLRUCache does not support range() function.");
+    }
+
+    @Override
+    public KeyValueIterator<K, V> all() {
+        throw new UnsupportedOperationException("MemoryLRUCache does not support all() function.");
+    }
+
+    @Override
+    public void flush() {
+        // do-nothing since it is in-memory
+    }
+
+    @Override
+    public void close() {
+        // do-nothing since it is in-memory
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
new file mode 100644
index 0000000..99bac93
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MemoryNavigableLRUCache.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+public class MemoryNavigableLRUCache<K, V> extends MemoryLRUCache<K, V> {
+
+    public MemoryNavigableLRUCache(String name, final int maxCacheSize) {
+        super();
+
+        this.name = name;
+        this.keys = new TreeSet<>();
+
+        // leave room for one extra entry to handle adding an entry before the oldest can be removed
+        this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+                if (size() > maxCacheSize) {
+                    K key = eldest.getKey();
+                    keys.remove(key);
+                    if (listener != null) listener.apply(key, eldest.getValue());
+                    return true;
+                }
+                return false;
+            }
+        };
+    }
+
+    @Override
+    public MemoryNavigableLRUCache<K, V> whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
+        this.listener = listener;
+
+        return this;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public KeyValueIterator<K, V> range(K from, K to) {
+        return new MemoryNavigableLRUCache.CacheIterator<K, V>(((NavigableSet) this.keys).subSet(from, true, to, false).iterator(), this.map);
+    }
+
+    @Override
+    public KeyValueIterator<K, V> all() {
+        return new MemoryNavigableLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
+    }
+
+    private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
+        private final Iterator<K> keys;
+        private final Map<K, V> entries;
+        private K lastKey;
+
+        public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
+            this.keys = keys;
+            this.entries = entries;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return keys.hasNext();
+        }
+
+        @Override
+        public KeyValue<K, V> next() {
+            lastKey = keys.next();
+            return new KeyValue<>(lastKey, entries.get(lastKey));
+        }
+
+        @Override
+        public void remove() {
+            keys.remove();
+            entries.remove(lastKey);
+        }
+
+        @Override
+        public void close() {
+            // do nothing
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 6dee4c7..fd308c3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -17,25 +17,27 @@
 
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.Serdes;
 
 import java.util.List;
 
+/**
+ * Metered KeyValueStore wrapper is used for recording operation metrics, and hence its
+ * inner KeyValueStore implementation do not need to provide its own metrics collecting functionality.
+ *
+ * @param <K>
+ * @param <V>
+ */
 public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
     protected final KeyValueStore<K, V> inner;
-    protected final StoreChangeLogger.ValueGetter getter;
-    protected final Serdes<K, V> serialization;
     protected final String metricScope;
     protected final Time time;
 
@@ -49,27 +51,13 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     private Sensor restoreTime;
     private StreamsMetrics metrics;
 
-    private boolean loggingEnabled = true;
-    private StoreChangeLogger<K, V> changeLogger = null;
-
     // always wrap the store with the metered store
-    public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricScope, Time time) {
+    public MeteredKeyValueStore(final KeyValueStore<K, V> inner, String metricScope, Time time) {
         this.inner = inner;
-        this.getter = new StoreChangeLogger.ValueGetter<K, V>() {
-            public V get(K key) {
-                return inner.get(key);
-            }
-        };
-        this.serialization = serialization;
         this.metricScope = metricScope;
         this.time = time != null ? time : new SystemTime();
     }
 
-    public MeteredKeyValueStore<K, V> disableLogging() {
-        loggingEnabled = false;
-        return this;
-    }
-
     @Override
     public String name() {
         return inner.name();
@@ -88,22 +76,10 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
         this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
 
-        this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, serialization) : null;
-
         // register and possibly restore the state from the logs
         long startNs = time.nanoseconds();
-        inner.init(context);
         try {
-            final Deserializer<K> keyDeserializer = serialization.keyDeserializer();
-            final Deserializer<V> valDeserializer = serialization.valueDeserializer();
-
-            context.register(this, loggingEnabled, new StateRestoreCallback() {
-                @Override
-                public void restore(byte[] key, byte[] value) {
-                    inner.put(keyDeserializer.deserialize(name, key),
-                            valDeserializer.deserialize(name, value));
-                }
-            });
+            inner.init(context);
         } finally {
             this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
         }
@@ -129,11 +105,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         long startNs = time.nanoseconds();
         try {
             this.inner.put(key, value);
-
-            if (loggingEnabled) {
-                changeLogger.add(key);
-                changeLogger.maybeLogChange(this.getter);
-            }
         } finally {
             this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
         }
@@ -144,14 +115,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         long startNs = time.nanoseconds();
         try {
             this.inner.putAll(entries);
-
-            if (loggingEnabled) {
-                for (KeyValue<K, V> entry : entries) {
-                    K key = entry.key;
-                    changeLogger.add(key);
-                }
-                changeLogger.maybeLogChange(this.getter);
-            }
         } finally {
             this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds());
         }
@@ -163,27 +126,12 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         try {
             V value = this.inner.delete(key);
 
-            removed(key);
-
             return value;
         } finally {
             this.metrics.recordLatency(this.deleteTime, startNs, time.nanoseconds());
         }
     }
 
-    /**
-     * Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this
-     * store.
-     *
-     * @param key the key for the entry that the inner store removed
-     */
-    protected void removed(K key) {
-        if (loggingEnabled) {
-            changeLogger.delete(key);
-            changeLogger.maybeLogChange(this.getter);
-        }
-    }
-
     @Override
     public KeyValueIterator<K, V> range(K from, K to) {
         return new MeteredKeyValueIterator<K, V>(this.inner.range(from, to), this.rangeTime);
@@ -204,9 +152,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         long startNs = time.nanoseconds();
         try {
             this.inner.flush();
-
-            if (loggingEnabled)
-                changeLogger.logChange(this.getter);
         } finally {
             this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
         }
@@ -247,7 +192,5 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
                 metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
             }
         }
-
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 862c322..33f4c88 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -23,45 +23,28 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.state.Serdes;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
 
 public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
 
     protected final WindowStore<K, V> inner;
-    protected final StoreChangeLogger.ValueGetter<byte[], byte[]> getter;
     protected final String metricScope;
     protected final Time time;
 
     private Sensor putTime;
-    private Sensor getTime;
-    private Sensor rangeTime;
+    private Sensor fetchTime;
     private Sensor flushTime;
     private Sensor restoreTime;
     private StreamsMetrics metrics;
 
-    private boolean loggingEnabled = true;
-    private StoreChangeLogger<byte[], byte[]> changeLogger = null;
-
     // always wrap the store with the metered store
     public MeteredWindowStore(final WindowStore<K, V> inner, String metricScope, Time time) {
         this.inner = inner;
-        this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
-            public byte[] get(byte[] key) {
-                return inner.getInternal(key);
-            }
-        };
         this.metricScope = metricScope;
         this.time = time != null ? time : new SystemTime();
     }
 
-    public MeteredWindowStore<K, V> disableLogging() {
-        loggingEnabled = false;
-        return this;
-    }
-
     @Override
     public String name() {
         return inner.name();
@@ -72,24 +55,14 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
         final String name = name();
         this.metrics = context.metrics();
         this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
-        this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
-        this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range");
+        this.fetchTime = this.metrics.addLatencySensor(metricScope, name, "fetch");
         this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
         this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
 
-        this.changeLogger = this.loggingEnabled ?
-                new StoreChangeLogger<>(name, context, Serdes.withBuiltinTypes("", byte[].class, byte[].class)) : null;
-
         // register and possibly restore the state from the logs
         long startNs = time.nanoseconds();
-        inner.init(context);
         try {
-            context.register(this, loggingEnabled, new StateRestoreCallback() {
-                @Override
-                public void restore(byte[] key, byte[] value) {
-                    inner.putInternal(key, value);
-                }
-            });
+            inner.init(context);
         } finally {
             this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
         }
@@ -102,48 +75,26 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
 
     @Override
     public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
-        return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timeFrom, timeTo), this.rangeTime);
+        return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timeFrom, timeTo), this.fetchTime);
     }
 
     @Override
     public void put(K key, V value) {
-        putAndReturnInternalKey(key, value, -1L);
-    }
-
-    @Override
-    public void put(K key, V value, long timestamp) {
-        putAndReturnInternalKey(key, value, timestamp);
-    }
-
-    @Override
-    public byte[] putAndReturnInternalKey(K key, V value, long timestamp) {
         long startNs = time.nanoseconds();
         try {
-            byte[] binKey = this.inner.putAndReturnInternalKey(key, value, timestamp);
-
-            if (loggingEnabled) {
-                changeLogger.add(binKey);
-                changeLogger.maybeLogChange(this.getter);
-            }
-
-            return binKey;
+            this.inner.put(key, value);
         } finally {
             this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
         }
     }
 
     @Override
-    public void putInternal(byte[] binaryKey, byte[] binaryValue) {
-        inner.putInternal(binaryKey, binaryValue);
-    }
-
-    @Override
-    public byte[] getInternal(byte[] binaryKey) {
+    public void put(K key, V value, long timestamp) {
         long startNs = time.nanoseconds();
         try {
-            return this.inner.getInternal(binaryKey);
+            this.inner.put(key, value, timestamp);
         } finally {
-            this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
+            this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
         }
     }
 
@@ -157,9 +108,6 @@ public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
         long startNs = time.nanoseconds();
         try {
             this.inner.flush();
-
-            if (loggingEnabled)
-                changeLogger.logChange(this.getter);
         } finally {
             this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index e276f83..853fc5d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -100,7 +100,7 @@ public class OffsetCheckpoint {
 
     public Map<TopicPartition, Long> read() throws IOException {
         synchronized (lock) {
-            BufferedReader reader = null;
+            BufferedReader reader;
             try {
                 reader = new BufferedReader(new FileReader(file));
             } catch (FileNotFoundException e) {
@@ -136,8 +136,7 @@ public class OffsetCheckpoint {
                         throw new IllegalArgumentException("Unknown offset checkpoint version: " + version);
                 }
             } finally {
-                if (reader != null)
-                    reader.close();
+                reader.close();
             }
         }
     }
@@ -146,8 +145,7 @@ public class OffsetCheckpoint {
         String line = reader.readLine();
         if (line == null)
             throw new EOFException("File ended prematurely.");
-        int val = Integer.parseInt(line);
-        return val;
+        return Integer.parseInt(line);
     }
 
     public void delete() throws IOException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java
new file mode 100644
index 0000000..cff9d6b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RawStoreChangeLogger.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.state.WindowStoreUtils;
+
+import java.util.Comparator;
+import java.util.TreeSet;
+
+public class RawStoreChangeLogger extends StoreChangeLogger<byte[], byte[]> {
+
+    private class ByteArrayComparator implements Comparator<byte[]> {
+        @Override
+        public int compare(byte[] left, byte[] right) {
+            for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
+                int a = left[i] & 0xff;
+                int b = right[j] & 0xff;
+
+                if (a != b)
+                    return a - b;
+            }
+            return left.length - right.length;
+        }
+    }
+
+    public RawStoreChangeLogger(String topic, ProcessorContext context) {
+        this(topic, context, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
+    }
+
+    public RawStoreChangeLogger(String topic, ProcessorContext context, int maxDirty, int maxRemoved) {
+        super(topic, context, context.id().partition, WindowStoreUtils.INNER_SERDES, maxDirty, maxRemoved);
+        init();
+    }
+
+    @Override
+    public void init() {
+        this.dirty = new TreeSet<>(new ByteArrayComparator());
+        this.removed = new TreeSet<>(new ByteArrayComparator());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
index 8c3b437..3a4c351 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueStoreSupplier.java
@@ -33,7 +33,7 @@ import org.apache.kafka.streams.state.Serdes;
 public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
 
     private final String name;
-    private final Serdes serdes;
+    private final Serdes<K, V> serdes;
     private final Time time;
 
     public RocksDBKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
@@ -47,7 +47,6 @@ public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
     }
 
     public StateStore get() {
-        return new MeteredKeyValueStore<>(new RocksDBStore<K, V>(name, serdes), serdes, "rocksdb-state", time);
+        return new MeteredKeyValueStore<>(new RocksDBStore<>(name, serdes).enableLogging(), "rocksdb-state", time);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57da044a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index b324ff1..d7e229d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -20,9 +20,11 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Serdes;
+
 import org.rocksdb.BlockBasedTableConfig;
 import org.rocksdb.CompactionStyle;
 import org.rocksdb.CompressionType;
@@ -31,18 +33,23 @@ import org.rocksdb.Options;
 import org.rocksdb.RocksDB;
 import org.rocksdb.RocksDBException;
 import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteOptions;
 
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Set;
 
 public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
     private static final int TTL_NOT_USED = -1;
 
     // TODO: these values should be configurable
+    private static final int DEFAULT_UNENCODED_CACHE_SIZE = 1000;
     private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
     private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
     private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
@@ -58,11 +65,31 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
     private final WriteOptions wOptions;
     private final FlushOptions fOptions;
 
-    private Serdes<K, V> serdes;
     private ProcessorContext context;
+    private Serdes<K, V> serdes;
     protected File dbDir;
     private RocksDB db;
 
+    private boolean loggingEnabled = false;
+    private int cacheSize = DEFAULT_UNENCODED_CACHE_SIZE;
+
+    private Set<K> cacheDirtyKeys;
+    private MemoryLRUCache<K, RocksDBCacheEntry> cache;
+    private StoreChangeLogger<byte[], byte[]> changeLogger;
+    private StoreChangeLogger.ValueGetter<byte[], byte[]> getter;
+
+    public KeyValueStore<K, V> enableLogging() {
+        loggingEnabled = true;
+
+        return this;
+    }
+
+    public RocksDBStore<K, V> withCacheSize(int cacheSize) {
+        this.cacheSize = cacheSize;
+
+        return this;
+    }
+
     public RocksDBStore(String name, Serdes<K, V> serdes) {
         this.name = name;
         this.serdes = serdes;
@@ -88,10 +115,63 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
         fOptions.setWaitForFlush(true);
     }
 
+    private class RocksDBCacheEntry {
+        public V value;
+        public boolean isDirty;
+
+        public RocksDBCacheEntry(V value) {
+            this(value, false);
+        }
+
+        public RocksDBCacheEntry(V value, boolean isDirty) {
+            this.value = value;
+            this.isDirty = isDirty;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
     public void init(ProcessorContext context) {
         this.context = context;
         this.dbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), this.name);
         this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
+
+        this.changeLogger = this.loggingEnabled ? new RawStoreChangeLogger(name, context) : null;
+
+        if (this.cacheSize > 0) {
+            this.cache = new MemoryLRUCache<K, RocksDBCacheEntry>(name, cacheSize)
+                    .whenEldestRemoved(new MemoryLRUCache.EldestEntryRemovalListener<K, RocksDBCacheEntry>() {
+                        @Override
+                        public void apply(K key, RocksDBCacheEntry entry) {
+                            // flush all the dirty entries to RocksDB if this evicted entry is dirty
+                            if (entry.isDirty) {
+                                flush();
+                            }
+                        }
+                    });
+
+
+            this.cacheDirtyKeys = new HashSet<>();
+        } else {
+            this.cache = null;
+            this.cacheDirtyKeys = null;
+        }
+
+        // value getter should always read directly from rocksDB
+        // since it is only for values that are already flushed
+        this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
+            @Override
+            public byte[] get(byte[] key) {
+                return getInternal(key);
+            }
+        };
+
+        context.register(this, loggingEnabled, new StateRestoreCallback() {
+
+            @Override
+            public void restore(byte[] key, byte[] value) {
+                putInternal(key, value);
+            }
+        });
     }
 
     private RocksDB openDB(File dir, Options options, int ttl) {
@@ -100,12 +180,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
                 dir.getParentFile().mkdirs();
                 return RocksDB.open(options, dir.toString());
             } else {
-                throw new ProcessorStateException("Change log is not supported for store " + this.name + " since it is TTL based.");
+                throw new UnsupportedOperationException("Change log is not supported for store " + this.name + " since it is TTL based.");
                 // TODO: support TTL with change log?
                 // return TtlDB.open(options, dir.toString(), ttl, false);
             }
         } catch (RocksDBException e) {
-            // TODO: this needs to be handled more accurately
             throw new ProcessorStateException("Error opening store " + this.name + " at location " + dir.toString(), e);
         }
     }
@@ -122,25 +201,64 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
     @Override
     public V get(K key) {
+        if (cache != null) {
+            RocksDBCacheEntry entry = cache.get(key);
+
+            if (entry == null) {
+                byte[] rawKey = serdes.rawKey(key);
+                V value = serdes.valueFrom(getInternal(serdes.rawKey(key)));
+                cache.put(key, new RocksDBCacheEntry(value));
+
+                return value;
+            } else {
+                return entry.value;
+            }
+        } else {
+            return serdes.valueFrom(getInternal(serdes.rawKey(key)));
+        }
+    }
+
+    private byte[] getInternal(byte[] rawKey) {
         try {
-            return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
+            return this.db.get(rawKey);
         } catch (RocksDBException e) {
-            // TODO: this needs to be handled more accurately
-            throw new ProcessorStateException("Error while executing get " + key.toString() + " from store " + this.name, e);
+            throw new ProcessorStateException("Error while getting value for key " + serdes.keyFrom(rawKey) +
+                    " from store " + this.name, e);
         }
     }
 
     @Override
     public void put(K key, V value) {
-        try {
-            if (value == null) {
-                db.remove(wOptions, serdes.rawKey(key));
-            } else {
-                db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
+        if (cache != null) {
+            cache.put(key, new RocksDBCacheEntry(value, true));
+            cacheDirtyKeys.add(key);
+        } else {
+            byte[] rawKey = serdes.rawKey(key);
+            byte[] rawValue = serdes.rawValue(value);
+            putInternal(rawKey, rawValue);
+        }
+    }
+
+    private void putInternal(byte[] rawKey, byte[] rawValue) {
+        if (rawValue == null) {
+            try {
+                db.remove(wOptions, rawKey);
+            } catch (RocksDBException e) {
+                throw new ProcessorStateException("Error while removing key " + serdes.keyFrom(rawKey) +
+                        " from store " + this.name, e);
             }
-        } catch (RocksDBException e) {
-            // TODO: this needs to be handled more accurately
-            throw new ProcessorStateException("Error while executing put " + key.toString() + " from store " + this.name, e);
+        } else {
+            try {
+                db.put(wOptions, rawKey, rawValue);
+            } catch (RocksDBException e) {
+                throw new ProcessorStateException("Error while executing put key " + serdes.keyFrom(rawKey) +
+                        " and value " + serdes.keyFrom(rawValue) + " from store " + this.name, e);
+            }
+        }
+
+        if (loggingEnabled) {
+            changeLogger.add(rawKey);
+            changeLogger.maybeLogChange(this.getter);
         }
     }
 
@@ -150,6 +268,21 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
             put(entry.key, entry.value);
     }
 
+    // this function is only called in flush()
+    private void putAllInternal(List<KeyValue<byte[], byte[]>> entries) {
+        WriteBatch batch = new WriteBatch();
+
+        for (KeyValue<byte[], byte[]> entry : entries) {
+            batch.put(entry.key, entry.value);
+        }
+
+        try {
+            db.write(wOptions, batch);
+        } catch (RocksDBException e) {
+            throw new ProcessorStateException("Error while batch writing to store " + this.name, e);
+        }
+    }
+
     @Override
     public V delete(K key) {
         V value = get(key);
@@ -159,11 +292,19 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
     @Override
     public KeyValueIterator<K, V> range(K from, K to) {
+        // we need to flush the cache if necessary before returning the iterator
+        if (cache != null)
+            flush();
+
         return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
     }
 
     @Override
     public KeyValueIterator<K, V> all() {
+        // we need to flush the cache if necessary before returning the iterator
+        if (cache != null)
+            flush();
+
         RocksIterator innerIter = db.newIterator();
         innerIter.seekToFirst();
         return new RocksDbIterator<K, V>(innerIter, serdes);
@@ -171,10 +312,60 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
     @Override
     public void flush() {
+        // flush of the cache entries if necessary
+        if (cache != null) {
+            List<KeyValue<byte[], byte[]>> putBatch = new ArrayList<>(cache.keys.size());
+            List<byte[]> deleteBatch = new ArrayList<>(cache.keys.size());
+
+            for (K key : cacheDirtyKeys) {
+                RocksDBCacheEntry entry = cache.get(key);
+
+                assert entry.isDirty;
+
+                byte[] rawKey = serdes.rawKey(key);
+
+                if (entry.value != null) {
+                    putBatch.add(new KeyValue<>(rawKey, serdes.rawValue(entry.value)));
+                } else {
+                    deleteBatch.add(rawKey);
+                }
+            }
+
+            putAllInternal(putBatch);
+
+            if (loggingEnabled) {
+                for (KeyValue<byte[], byte[]> kv : putBatch)
+                    changeLogger.add(kv.key);
+            }
+
+            // check all removed entries and remove them in rocksDB
+            // TODO: can this be done in batch as well?
+            for (byte[] removedKey : deleteBatch) {
+                try {
+                    db.remove(wOptions, removedKey);
+                } catch (RocksDBException e) {
+                    throw new ProcessorStateException("Error while deleting with key " + serdes.keyFrom(removedKey) + " from store " + this.name, e);
+                }
+
+                if (loggingEnabled) {
+                    changeLogger.delete(removedKey);
+                }
+            }
+
+            // reset dirty set
+            cacheDirtyKeys.clear();
+        }
+
+        flushInternal();
+
+        if (loggingEnabled)
+            changeLogger.logChange(getter);
+    }
+
+    public void flushInternal() {
         try {
             db.flush(fOptions);
         } catch (RocksDBException e) {
-            // TODO: this needs to be handled more accurately
             throw new ProcessorStateException("Error while executing flush from store " + this.name, e);
         }
     }