You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/01/21 01:10:50 UTC

[4/5] kafka git commit: KAFKA-3121: Remove aggregatorSupplier and add Reduce functions

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
deleted file mode 100644
index 743a110..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.utils.Time;
-
-import java.util.List;
-
-public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
-
-    protected final KeyValueStore<K, V> inner;
-    protected final StoreChangeLogger.ValueGetter getter;
-    protected final Serdes<K, V> serialization;
-    protected final String metricScope;
-    protected final Time time;
-
-    private Sensor putTime;
-    private Sensor getTime;
-    private Sensor deleteTime;
-    private Sensor putAllTime;
-    private Sensor allTime;
-    private Sensor rangeTime;
-    private Sensor flushTime;
-    private Sensor restoreTime;
-    private StreamingMetrics metrics;
-
-    private boolean loggingEnabled = true;
-    private StoreChangeLogger<K, V> changeLogger = null;
-
-    // always wrap the store with the metered store
-    public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricScope, Time time) {
-        this.inner = inner;
-        this.getter = new StoreChangeLogger.ValueGetter<K, V>() {
-            public V get(K key) {
-                return inner.get(key);
-            }
-        };
-        this.serialization = serialization;
-        this.metricScope = metricScope;
-        this.time = time != null ? time : new SystemTime();
-    }
-
-    public MeteredKeyValueStore<K, V> disableLogging() {
-        loggingEnabled = false;
-        return this;
-    }
-
-    @Override
-    public String name() {
-        return inner.name();
-    }
-
-    @Override
-    public void init(ProcessorContext context) {
-        final String name = name();
-        this.metrics = context.metrics();
-        this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
-        this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
-        this.deleteTime = this.metrics.addLatencySensor(metricScope, name, "delete");
-        this.putAllTime = this.metrics.addLatencySensor(metricScope, name, "put-all");
-        this.allTime = this.metrics.addLatencySensor(metricScope, name, "all");
-        this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range");
-        this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
-        this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
-
-        serialization.init(context);
-        this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, serialization) : null;
-
-        // register and possibly restore the state from the logs
-        long startNs = time.nanoseconds();
-        inner.init(context);
-        try {
-            final Deserializer<K> keyDeserializer = serialization.keyDeserializer();
-            final Deserializer<V> valDeserializer = serialization.valueDeserializer();
-
-            context.register(this, loggingEnabled, new StateRestoreCallback() {
-                @Override
-                public void restore(byte[] key, byte[] value) {
-                    inner.put(keyDeserializer.deserialize(name, key),
-                            valDeserializer.deserialize(name, value));
-                }
-            });
-        } finally {
-            this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
-        }
-    }
-
-    @Override
-    public boolean persistent() {
-        return inner.persistent();
-    }
-
-    @Override
-    public V get(K key) {
-        long startNs = time.nanoseconds();
-        try {
-            return this.inner.get(key);
-        } finally {
-            this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
-        }
-    }
-
-    @Override
-    public void put(K key, V value) {
-        long startNs = time.nanoseconds();
-        try {
-            this.inner.put(key, value);
-
-            if (loggingEnabled) {
-                changeLogger.add(key);
-                changeLogger.maybeLogChange(this.getter);
-            }
-        } finally {
-            this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
-        }
-    }
-
-    @Override
-    public void putAll(List<Entry<K, V>> entries) {
-        long startNs = time.nanoseconds();
-        try {
-            this.inner.putAll(entries);
-
-            if (loggingEnabled) {
-                for (Entry<K, V> entry : entries) {
-                    K key = entry.key();
-                    changeLogger.add(key);
-                }
-                changeLogger.maybeLogChange(this.getter);
-            }
-        } finally {
-            this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds());
-        }
-    }
-
-    @Override
-    public V delete(K key) {
-        long startNs = time.nanoseconds();
-        try {
-            V value = this.inner.delete(key);
-
-            removed(key);
-
-            return value;
-        } finally {
-            this.metrics.recordLatency(this.deleteTime, startNs, time.nanoseconds());
-        }
-    }
-
-    /**
-     * Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this
-     * store.
-     *
-     * @param key the key for the entry that the inner store removed
-     */
-    protected void removed(K key) {
-        if (loggingEnabled) {
-            changeLogger.delete(key);
-            changeLogger.maybeLogChange(this.getter);
-        }
-    }
-
-    @Override
-    public KeyValueIterator<K, V> range(K from, K to) {
-        return new MeteredKeyValueIterator<K, V>(this.inner.range(from, to), this.rangeTime);
-    }
-
-    @Override
-    public KeyValueIterator<K, V> all() {
-        return new MeteredKeyValueIterator<K, V>(this.inner.all(), this.allTime);
-    }
-
-    @Override
-    public void close() {
-        inner.close();
-    }
-
-    @Override
-    public void flush() {
-        long startNs = time.nanoseconds();
-        try {
-            this.inner.flush();
-
-            if (loggingEnabled)
-                changeLogger.logChange(this.getter);
-        } finally {
-            this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
-        }
-    }
-
-    private class MeteredKeyValueIterator<K1, V1> implements KeyValueIterator<K1, V1> {
-
-        private final KeyValueIterator<K1, V1> iter;
-        private final Sensor sensor;
-        private final long startNs;
-
-        public MeteredKeyValueIterator(KeyValueIterator<K1, V1> iter, Sensor sensor) {
-            this.iter = iter;
-            this.sensor = sensor;
-            this.startNs = time.nanoseconds();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return iter.hasNext();
-        }
-
-        @Override
-        public Entry<K1, V1> next() {
-            return iter.next();
-        }
-
-        @Override
-        public void remove() {
-            iter.remove();
-        }
-
-        @Override
-        public void close() {
-            try {
-                iter.close();
-            } finally {
-                metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
-            }
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
deleted file mode 100644
index cfcfb00..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredWindowStore.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamingMetrics;
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-
-public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
-
-    protected final WindowStore<K, V> inner;
-    protected final StoreChangeLogger.ValueGetter<byte[], byte[]> getter;
-    protected final String metricScope;
-    protected final Time time;
-
-    private Sensor putTime;
-    private Sensor getTime;
-    private Sensor rangeTime;
-    private Sensor flushTime;
-    private Sensor restoreTime;
-    private StreamingMetrics metrics;
-
-    private boolean loggingEnabled = true;
-    private StoreChangeLogger<byte[], byte[]> changeLogger = null;
-
-    // always wrap the store with the metered store
-    public MeteredWindowStore(final WindowStore<K, V> inner, String metricScope, Time time) {
-        this.inner = inner;
-        this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
-            public byte[] get(byte[] key) {
-                return inner.getInternal(key);
-            }
-        };
-        this.metricScope = metricScope;
-        this.time = time != null ? time : new SystemTime();
-    }
-
-    public MeteredWindowStore<K, V> disableLogging() {
-        loggingEnabled = false;
-        return this;
-    }
-
-    @Override
-    public String name() {
-        return inner.name();
-    }
-
-    @Override
-    public void init(ProcessorContext context) {
-        final String name = name();
-        this.metrics = context.metrics();
-        this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
-        this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
-        this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range");
-        this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
-        this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
-
-        this.changeLogger = this.loggingEnabled ?
-                new StoreChangeLogger<>(name, context, Serdes.withBuiltinTypes("", byte[].class, byte[].class)) : null;
-
-        // register and possibly restore the state from the logs
-        long startNs = time.nanoseconds();
-        inner.init(context);
-        try {
-            context.register(this, loggingEnabled, new StateRestoreCallback() {
-                @Override
-                public void restore(byte[] key, byte[] value) {
-                    inner.putInternal(key, value);
-                }
-            });
-        } finally {
-            this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
-        }
-    }
-
-    @Override
-    public boolean persistent() {
-        return inner.persistent();
-    }
-
-    @Override
-    public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
-        return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timeFrom, timeTo), this.rangeTime);
-    }
-
-    @Override
-    public void put(K key, V value) {
-        putAndReturnInternalKey(key, value, -1L);
-    }
-
-    @Override
-    public void put(K key, V value, long timestamp) {
-        putAndReturnInternalKey(key, value, timestamp);
-    }
-
-    @Override
-    public byte[] putAndReturnInternalKey(K key, V value, long timestamp) {
-        long startNs = time.nanoseconds();
-        try {
-            byte[] binKey = this.inner.putAndReturnInternalKey(key, value, timestamp);
-
-            if (loggingEnabled) {
-                changeLogger.add(binKey);
-                changeLogger.maybeLogChange(this.getter);
-            }
-
-            return binKey;
-        } finally {
-            this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
-        }
-    }
-
-    @Override
-    public void putInternal(byte[] binaryKey, byte[] binaryValue) {
-        inner.putInternal(binaryKey, binaryValue);
-    }
-
-    @Override
-    public byte[] getInternal(byte[] binaryKey) {
-        long startNs = time.nanoseconds();
-        try {
-            return this.inner.getInternal(binaryKey);
-        } finally {
-            this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
-        }
-    }
-
-    @Override
-    public void close() {
-        inner.close();
-    }
-
-    @Override
-    public void flush() {
-        long startNs = time.nanoseconds();
-        try {
-            this.inner.flush();
-
-            if (loggingEnabled)
-                changeLogger.logChange(this.getter);
-        } finally {
-            this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
-        }
-    }
-
-    private class MeteredWindowStoreIterator<E> implements WindowStoreIterator<E> {
-
-        private final WindowStoreIterator<E> iter;
-        private final Sensor sensor;
-        private final long startNs;
-
-        public MeteredWindowStoreIterator(WindowStoreIterator<E> iter, Sensor sensor) {
-            this.iter = iter;
-            this.sensor = sensor;
-            this.startNs = time.nanoseconds();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return iter.hasNext();
-        }
-
-        @Override
-        public KeyValue<Long, E> next() {
-            return iter.next();
-        }
-
-        @Override
-        public void remove() {
-            iter.remove();
-        }
-
-        @Override
-        public void close() {
-            try {
-                iter.close();
-            } finally {
-                metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
-            }
-        }
-
-    }
-
-    WindowStore<K, V> inner() {
-        return inner;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java
deleted file mode 100644
index d748aac..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/OffsetCheckpoint.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.Utils;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This class saves out a map of topic/partition=&gt;offsets to a file. The format of the file is UTF-8 text containing the following:
- * <pre>
- *   &lt;version&gt;
- *   &lt;n&gt;
- *   &lt;topic_name_1&gt; &lt;partition_1&gt; &lt;offset_1&gt;
- *   .
- *   .
- *   .
- *   &lt;topic_name_n&gt; &lt;partition_n&gt; &lt;offset_n&gt;
- * </pre>
- *   The first line contains a number designating the format version (currently 0), the get line contains
- *   a number giving the total number of offsets. Each successive line gives a topic/partition/offset triple
- *   separated by spaces.
- */
-public class OffsetCheckpoint {
-
-    private static final int VERSION = 0;
-
-    private final File file;
-    private final Object lock;
-
-    public OffsetCheckpoint(File file) throws IOException {
-        this.file = file;
-        this.lock = new Object();
-    }
-
-    public void write(Map<TopicPartition, Long> offsets) throws IOException {
-        synchronized (lock) {
-            // write to temp file and then swap with the existing file
-            File temp = new File(file.getAbsolutePath() + ".tmp");
-
-            FileOutputStream fileOutputStream = new FileOutputStream(temp);
-            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream));
-            try {
-                writeIntLine(writer, VERSION);
-                writeIntLine(writer, offsets.size());
-
-                for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
-                    writeEntry(writer, entry.getKey(), entry.getValue());
-
-                writer.flush();
-                fileOutputStream.getFD().sync();
-            } finally {
-                writer.close();
-            }
-
-            Utils.atomicMoveWithFallback(temp.toPath(), file.toPath());
-        }
-    }
-
-    private void writeIntLine(BufferedWriter writer, int number) throws IOException {
-        writer.write(Integer.toString(number));
-        writer.newLine();
-    }
-
-    private void writeEntry(BufferedWriter writer, TopicPartition part, long offset) throws IOException {
-        writer.write(part.topic());
-        writer.write(' ');
-        writer.write(Integer.toString(part.partition()));
-        writer.write(' ');
-        writer.write(Long.toString(offset));
-        writer.newLine();
-    }
-
-    public Map<TopicPartition, Long> read() throws IOException {
-        synchronized (lock) {
-            BufferedReader reader = null;
-            try {
-                reader = new BufferedReader(new FileReader(file));
-            } catch (FileNotFoundException e) {
-                return Collections.emptyMap();
-            }
-
-            try {
-                int version = readInt(reader);
-                switch (version) {
-                    case 0:
-                        int expectedSize = readInt(reader);
-                        Map<TopicPartition, Long> offsets = new HashMap<>();
-                        String line = reader.readLine();
-                        while (line != null) {
-                            String[] pieces = line.split("\\s+");
-                            if (pieces.length != 3)
-                                throw new IOException(String.format("Malformed line in offset checkpoint file: '%s'.",
-                                    line));
-
-                            String topic = pieces[0];
-                            int partition = Integer.parseInt(pieces[1]);
-                            long offset = Long.parseLong(pieces[2]);
-                            offsets.put(new TopicPartition(topic, partition), offset);
-                            line = reader.readLine();
-                        }
-                        if (offsets.size() != expectedSize)
-                            throw new IOException(String.format("Expected %d entries but found only %d",
-                                expectedSize,
-                                offsets.size()));
-                        return offsets;
-
-                    default:
-                        throw new IllegalArgumentException("Unknown offset checkpoint version: " + version);
-                }
-            } finally {
-                if (reader != null)
-                    reader.close();
-            }
-        }
-    }
-
-    private int readInt(BufferedReader reader) throws IOException {
-        String line = reader.readLine();
-        if (line == null)
-            throw new EOFException("File ended prematurely.");
-        int val = Integer.parseInt(line);
-        return val;
-    }
-
-    public void delete() throws IOException {
-        file.delete();
-    }
-
-    @Override
-    public String toString() {
-        return this.file.getAbsolutePath();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
deleted file mode 100644
index 41314b9..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStoreSupplier.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-/**
- * A {@link KeyValueStore} that stores all entries in a local RocksDB database.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- *
- * @see Stores#create(String)
- */
-public class RocksDBKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
-
-    private final String name;
-    private final Serdes serdes;
-    private final Time time;
-
-    protected RocksDBKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
-        this.name = name;
-        this.serdes = serdes;
-        this.time = time;
-    }
-
-    public String name() {
-        return name;
-    }
-
-    public StateStore get() {
-        return new MeteredKeyValueStore<>(new RocksDBStore<K, V>(name, serdes), serdes, "rocksdb-state", time);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
deleted file mode 100644
index 62b9f2c..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBStore.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.rocksdb.BlockBasedTableConfig;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.CompressionType;
-import org.rocksdb.FlushOptions;
-import org.rocksdb.Options;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.WriteOptions;
-
-import java.io.File;
-import java.util.Comparator;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
-
-    private static final int TTL_NOT_USED = -1;
-
-    // TODO: these values should be configurable
-    private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
-    private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
-    private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
-    private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
-    private static final long BLOCK_SIZE = 4096L;
-    private static final int TTL_SECONDS = TTL_NOT_USED;
-    private static final int MAX_WRITE_BUFFERS = 3;
-    private static final String DB_FILE_DIR = "rocksdb";
-
-    private final String name;
-
-    private final Options options;
-    private final WriteOptions wOptions;
-    private final FlushOptions fOptions;
-
-    private Serdes<K, V> serdes;
-    private ProcessorContext context;
-    protected File dbDir;
-    private RocksDB db;
-
-    public RocksDBStore(String name, Serdes<K, V> serdes) {
-        this.name = name;
-        this.serdes = serdes;
-
-        // initialize the rocksdb options
-        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
-        tableConfig.setBlockCacheSize(BLOCK_CACHE_SIZE);
-        tableConfig.setBlockSize(BLOCK_SIZE);
-
-        options = new Options();
-        options.setTableFormatConfig(tableConfig);
-        options.setWriteBufferSize(WRITE_BUFFER_SIZE);
-        options.setCompressionType(COMPRESSION_TYPE);
-        options.setCompactionStyle(COMPACTION_STYLE);
-        options.setMaxWriteBufferNumber(MAX_WRITE_BUFFERS);
-        options.setCreateIfMissing(true);
-        options.setErrorIfExists(false);
-
-        wOptions = new WriteOptions();
-        wOptions.setDisableWAL(true);
-
-        fOptions = new FlushOptions();
-        fOptions.setWaitForFlush(true);
-    }
-
-    public void init(ProcessorContext context) {
-        serdes.init(context);
-
-        this.context = context;
-        this.dbDir = new File(new File(this.context.stateDir(), DB_FILE_DIR), this.name);
-        this.db = openDB(this.dbDir, this.options, TTL_SECONDS);
-    }
-
-    private RocksDB openDB(File dir, Options options, int ttl) {
-        try {
-            if (ttl == TTL_NOT_USED) {
-                dir.getParentFile().mkdirs();
-                return RocksDB.open(options, dir.toString());
-            } else {
-                throw new KafkaException("Change log is not supported for store " + this.name + " since it is TTL based.");
-                // TODO: support TTL with change log?
-                // return TtlDB.open(options, dir.toString(), ttl, false);
-            }
-        } catch (RocksDBException e) {
-            // TODO: this needs to be handled more accurately
-            throw new KafkaException("Error opening store " + this.name + " at location " + dir.toString(), e);
-        }
-    }
-
-    @Override
-    public String name() {
-        return this.name;
-    }
-
-    @Override
-    public boolean persistent() {
-        return false;
-    }
-
-    @Override
-    public V get(K key) {
-        try {
-            return serdes.valueFrom(this.db.get(serdes.rawKey(key)));
-        } catch (RocksDBException e) {
-            // TODO: this needs to be handled more accurately
-            throw new KafkaException("Error while executing get " + key.toString() + " from store " + this.name, e);
-        }
-    }
-
-    @Override
-    public void put(K key, V value) {
-        try {
-            if (value == null) {
-                db.remove(wOptions, serdes.rawKey(key));
-            } else {
-                db.put(wOptions, serdes.rawKey(key), serdes.rawValue(value));
-            }
-        } catch (RocksDBException e) {
-            // TODO: this needs to be handled more accurately
-            throw new KafkaException("Error while executing put " + key.toString() + " from store " + this.name, e);
-        }
-    }
-
-    @Override
-    public void putAll(List<Entry<K, V>> entries) {
-        for (Entry<K, V> entry : entries)
-            put(entry.key(), entry.value());
-    }
-
-    @Override
-    public V delete(K key) {
-        V value = get(key);
-        put(key, null);
-        return value;
-    }
-
-    @Override
-    public KeyValueIterator<K, V> range(K from, K to) {
-        return new RocksDBRangeIterator<K, V>(db.newIterator(), serdes, from, to);
-    }
-
-    @Override
-    public KeyValueIterator<K, V> all() {
-        RocksIterator innerIter = db.newIterator();
-        innerIter.seekToFirst();
-        return new RocksDbIterator<K, V>(innerIter, serdes);
-    }
-
-    @Override
-    public void flush() {
-        try {
-            db.flush(fOptions);
-        } catch (RocksDBException e) {
-            // TODO: this needs to be handled more accurately
-            throw new KafkaException("Error while executing flush from store " + this.name, e);
-        }
-    }
-
-    @Override
-    public void close() {
-        flush();
-        db.close();
-    }
-
-    private static class RocksDbIterator<K, V> implements KeyValueIterator<K, V> {
-        private final RocksIterator iter;
-        private final Serdes<K, V> serdes;
-
-        public RocksDbIterator(RocksIterator iter, Serdes<K, V> serdes) {
-            this.iter = iter;
-            this.serdes = serdes;
-        }
-
-        protected byte[] peekRawKey() {
-            return iter.key();
-        }
-
-        protected Entry<K, V> getEntry() {
-            return new Entry<>(serdes.keyFrom(iter.key()), serdes.valueFrom(iter.value()));
-        }
-
-        @Override
-        public boolean hasNext() {
-            return iter.isValid();
-        }
-
-        @Override
-        public Entry<K, V> next() {
-            if (!hasNext())
-                throw new NoSuchElementException();
-
-            Entry<K, V> entry = this.getEntry();
-            iter.next();
-            return entry;
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException("RocksDB iterator does not support remove");
-        }
-
-        @Override
-        public void close() {
-            iter.dispose();
-        }
-
-    }
-
-    private static class LexicographicComparator implements Comparator<byte[]> {
-
-        @Override
-        public int compare(byte[] left, byte[] right) {
-            for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
-                int leftByte = left[i] & 0xff;
-                int rightByte = right[j] & 0xff;
-                if (leftByte != rightByte) {
-                    return leftByte - rightByte;
-                }
-            }
-            return left.length - right.length;
-        }
-    }
-
-    private static class RocksDBRangeIterator<K, V> extends RocksDbIterator<K, V> {
-        // RocksDB's JNI interface does not expose getters/setters that allow the
-        // comparator to be pluggable, and the default is lexicographic, so it's
-        // safe to just force lexicographic comparator here for now.
-        private final Comparator<byte[]> comparator = new LexicographicComparator();
-        byte[] rawToKey;
-
-        public RocksDBRangeIterator(RocksIterator iter, Serdes<K, V> serdes,
-                                    K from, K to) {
-            super(iter, serdes);
-            iter.seek(serdes.rawKey(from));
-            this.rawToKey = serdes.rawKey(to);
-        }
-
-        @Override
-        public boolean hasNext() {
-            return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) < 0;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
deleted file mode 100644
index 2f30712..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStore.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.processor.ProcessorContext;
-
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.SimpleTimeZone;
-
-public class RocksDBWindowStore<K, V> implements WindowStore<K, V> {
-
-    public static final long MIN_SEGMENT_INTERVAL = 60 * 1000; // one minute
-
-    private static final long USE_CURRENT_TIMESTAMP = -1L;
-
-    private static class Segment extends RocksDBStore<byte[], byte[]> {
-        public final long id;
-
-        Segment(String name, long id) {
-            super(name, WindowStoreUtil.INNER_SERDES);
-            this.id = id;
-        }
-
-        public void destroy() {
-            Utils.delete(dbDir);
-        }
-    }
-
-    private static class RocksDBWindowStoreIterator<V> implements WindowStoreIterator<V> {
-        private final Serdes<?, V> serdes;
-        private final KeyValueIterator<byte[], byte[]>[] iterators;
-        private int index = 0;
-
-        RocksDBWindowStoreIterator(Serdes<?, V> serdes) {
-            this(serdes, WindowStoreUtil.NO_ITERATORS);
-        }
-
-        RocksDBWindowStoreIterator(Serdes<?, V> serdes, KeyValueIterator<byte[], byte[]>[] iterators) {
-            this.serdes = serdes;
-            this.iterators = iterators;
-        }
-
-        @Override
-        public boolean hasNext() {
-            while (index < iterators.length) {
-                if (iterators[index].hasNext())
-                    return true;
-
-                index++;
-            }
-            return false;
-        }
-
-        @Override
-        public KeyValue<Long, V> next() {
-            if (index >= iterators.length)
-                throw new NoSuchElementException();
-
-            Entry<byte[], byte[]> entry = iterators[index].next();
-
-            return new KeyValue<>(WindowStoreUtil.timestampFromBinaryKey(entry.key()),
-                                  serdes.valueFrom(entry.value()));
-        }
-
-        @Override
-        public void remove() {
-            if (index < iterators.length)
-                iterators[index].remove();
-        }
-
-        @Override
-        public void close() {
-            for (KeyValueIterator<byte[], byte[]> iterator : iterators) {
-                iterator.close();
-            }
-        }
-    }
-
-    private final String name;
-    private final long segmentInterval;
-    private final boolean retainDuplicates;
-    private final Segment[] segments;
-    private final Serdes<K, V> serdes;
-    private final SimpleDateFormat formatter;
-
-    private ProcessorContext context;
-    private long currentSegmentId = -1L;
-    private int seqnum = 0;
-
-    public RocksDBWindowStore(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes) {
-        this.name = name;
-
-        // The segment interval must be greater than MIN_SEGMENT_INTERVAL
-        this.segmentInterval = Math.max(retentionPeriod / (numSegments - 1), MIN_SEGMENT_INTERVAL);
-
-        this.segments = new Segment[numSegments];
-        this.serdes = serdes;
-
-        this.retainDuplicates = retainDuplicates;
-
-        // Create a date formatter. Formatted timestamps are used as segment name suffixes
-        this.formatter = new SimpleDateFormat("yyyyMMddHHmm");
-        this.formatter.setTimeZone(new SimpleTimeZone(0, "GMT"));
-    }
-
-    @Override
-    public String name() {
-        return name;
-    }
-
-    @Override
-    public void init(ProcessorContext context) {
-        this.context = context;
-    }
-
-    @Override
-    public boolean persistent() {
-        return true;
-    }
-
-    @Override
-    public void flush() {
-        for (Segment segment : segments) {
-            if (segment != null)
-                segment.flush();
-        }
-    }
-
-    @Override
-    public void close() {
-        for (Segment segment : segments) {
-            if (segment != null)
-                segment.close();
-        }
-    }
-
-    @Override
-    public void put(K key, V value) {
-        putAndReturnInternalKey(key, value, USE_CURRENT_TIMESTAMP);
-    }
-
-    @Override
-    public void put(K key, V value, long timestamp) {
-        putAndReturnInternalKey(key, value, timestamp);
-    }
-
-    @Override
-    public byte[] putAndReturnInternalKey(K key, V value, long t) {
-        long timestamp = t == USE_CURRENT_TIMESTAMP ? context.timestamp() : t;
-
-        long segmentId = segmentId(timestamp);
-
-        if (segmentId > currentSegmentId) {
-            // A new segment will be created. Clean up old segments first.
-            currentSegmentId = segmentId;
-            cleanup();
-        }
-
-        // If the record is within the retention period, put it in the store.
-        if (segmentId > currentSegmentId - segments.length) {
-            if (retainDuplicates)
-                seqnum = (seqnum + 1) & 0x7FFFFFFF;
-            byte[] binaryKey = WindowStoreUtil.toBinaryKey(key, timestamp, seqnum, serdes);
-            getSegment(segmentId).put(binaryKey, serdes.rawValue(value));
-            return binaryKey;
-        } else {
-            return null;
-        }
-    }
-
-    @Override
-    public void putInternal(byte[] binaryKey, byte[] binaryValue) {
-        long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey));
-
-        if (segmentId > currentSegmentId) {
-            // A new segment will be created. Clean up old segments first.
-            currentSegmentId = segmentId;
-            cleanup();
-        }
-
-        // If the record is within the retention period, put it in the store.
-        if (segmentId > currentSegmentId - segments.length)
-            getSegment(segmentId).put(binaryKey, binaryValue);
-    }
-
-    @Override
-    public byte[] getInternal(byte[] binaryKey) {
-        long segmentId = segmentId(WindowStoreUtil.timestampFromBinaryKey(binaryKey));
-
-        Segment segment = segments[(int) (segmentId % segments.length)];
-
-        if (segment != null && segment.id == segmentId) {
-            return segment.get(binaryKey);
-        } else {
-            return null;
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
-        long segFrom = segmentId(timeFrom);
-        long segTo = segmentId(Math.max(0L, timeTo));
-
-        byte[] binaryFrom = WindowStoreUtil.toBinaryKey(key, timeFrom, 0, serdes);
-        byte[] binaryUntil = WindowStoreUtil.toBinaryKey(key, timeTo + 1L, 0, serdes);
-
-        ArrayList<KeyValueIterator<byte[], byte[]>> iterators = new ArrayList<>();
-
-        for (long segmentId = segFrom; segmentId <= segTo; segmentId++) {
-            Segment segment = segments[(int) (segmentId % segments.length)];
-
-            if (segment != null && segment.id == segmentId)
-                iterators.add(segment.range(binaryFrom, binaryUntil));
-        }
-
-        if (iterators.size() > 0) {
-            return new RocksDBWindowStoreIterator<>(serdes, iterators.toArray(new KeyValueIterator[iterators.size()]));
-        } else {
-            return new RocksDBWindowStoreIterator<>(serdes);
-        }
-    }
-
-    private Segment getSegment(long segmentId) {
-        int index = (int) (segmentId % segments.length);
-
-        if (segments[index] == null) {
-            segments[index] = new Segment(name + "-" + directorySuffix(segmentId), segmentId);
-            segments[index].init(context);
-        }
-
-        return segments[index];
-    }
-
-    private void cleanup() {
-        for (int i = 0; i < segments.length; i++) {
-            if (segments[i] != null && segments[i].id <= currentSegmentId - segments.length) {
-                segments[i].close();
-                segments[i].destroy();
-                segments[i] = null;
-            }
-        }
-    }
-
-    public long segmentId(long timestamp) {
-        return timestamp / segmentInterval;
-    }
-
-    public String directorySuffix(long segmentId) {
-        return formatter.format(new Date(segmentId * segmentInterval));
-    }
-
-    // this method is used by a test
-    public Set<Long> segmentIds() {
-        HashSet<Long> segmentIds = new HashSet<>();
-
-        for (Segment segment : segments) {
-            if (segment != null)
-                segmentIds.add(segment.id);
-        }
-
-        return segmentIds;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
deleted file mode 100644
index fcdcb9b..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBWindowStoreSupplier.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreSupplier;
-
-/**
- * A {@link KeyValueStore} that stores all entries in a local RocksDB database.
- *
- * @param <K> the type of keys
- * @param <V> the type of values
- *
- * @see Stores#create(String)
- */
-public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier {
-
-    private final String name;
-    private final long retentionPeriod;
-    private final boolean retainDuplicates;
-    private final int numSegments;
-    private final Serdes serdes;
-    private final Time time;
-
-    public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes, Time time) {
-        this.name = name;
-        this.retentionPeriod = retentionPeriod;
-        this.retainDuplicates = retainDuplicates;
-        this.numSegments = numSegments;
-        this.serdes = serdes;
-        this.time = time;
-    }
-
-    public String name() {
-        return name;
-    }
-
-    public StateStore get() {
-        return new MeteredWindowStore<>(new RocksDBWindowStore<K, V>(name, retentionPeriod, numSegments, retainDuplicates, serdes), "rocksdb-window", time);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java
deleted file mode 100644
index ee6624e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/state/StoreChangeLogger.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.state;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-
-import java.util.HashSet;
-import java.util.Set;
-
-public class StoreChangeLogger<K, V> {
-
-    public interface ValueGetter<K, V> {
-        V get(K key);
-    }
-
-    protected final Serdes<K, V> serialization;
-
-    private final Set<K> dirty;
-    private final Set<K> removed;
-    private final int maxDirty;
-    private final int maxRemoved;
-
-    private final String topic;
-    private int partition;
-    private ProcessorContext context;
-
-    // always wrap the logged store with the metered store
-    public StoreChangeLogger(String topic, ProcessorContext context, Serdes<K, V> serialization) {
-        this.topic = topic;
-        this.serialization = serialization;
-        this.context = context;
-        this.partition = context.id().partition;
-
-        this.dirty = new HashSet<>();
-        this.removed = new HashSet<>();
-        this.maxDirty = 100; // TODO: this needs to be configurable
-        this.maxRemoved = 100; // TODO: this needs to be configurable
-    }
-
-    public void add(K key) {
-        this.dirty.add(key);
-        this.removed.remove(key);
-    }
-
-    public void delete(K key) {
-        this.dirty.remove(key);
-        this.removed.add(key);
-    }
-
-    public void maybeLogChange(ValueGetter<K, V> getter) {
-        if (this.dirty.size() > this.maxDirty || this.removed.size() > this.maxRemoved)
-            logChange(getter);
-    }
-
-    public void logChange(ValueGetter<K, V> getter) {
-        RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
-        if (collector != null) {
-            Serializer<K> keySerializer = serialization.keySerializer();
-            Serializer<V> valueSerializer = serialization.valueSerializer();
-
-            for (K k : this.removed) {
-                collector.send(new ProducerRecord<>(this.topic, this.partition, k, (V) null), keySerializer, valueSerializer);
-            }
-            for (K k : this.dirty) {
-                V v = getter.get(k);
-                collector.send(new ProducerRecord<>(this.topic, this.partition, k, v), keySerializer, valueSerializer);
-            }
-            this.removed.clear();
-            this.dirty.clear();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 5452040..46b2592 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -27,6 +27,9 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.internals.InMemoryKeyValueStoreSupplier;
+import org.apache.kafka.streams.state.internals.InMemoryLRUCacheStoreSupplier;
+import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
 
 /**
  * Factory for creating key-value stores.

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
new file mode 100644
index 0000000..286db1b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStoreSupplier.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.Entry;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Serdes;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * An in-memory key-value store based on a TreeMap.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ *
+ * @see org.apache.kafka.streams.state.Stores#create(String)
+ */
+public class InMemoryKeyValueStoreSupplier<K, V> implements StateStoreSupplier {
+
+    private final String name;
+    private final Serdes serdes;
+    private final Time time;
+
+    public InMemoryKeyValueStoreSupplier(String name, Serdes<K, V> serdes, Time time) {
+        this.name = name;
+        this.serdes = serdes;
+        this.time = time;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public StateStore get() {
+        return new MeteredKeyValueStore<K, V>(new MemoryStore<K, V>(name), serdes, "in-memory-state", time);
+    }
+
+    private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
+
+        private final String name;
+        private final NavigableMap<K, V> map;
+
+        public MemoryStore(String name) {
+            super();
+            this.name = name;
+            this.map = new TreeMap<>();
+        }
+
+        @Override
+        public String name() {
+            return this.name;
+        }
+
+        @Override
+        public void init(ProcessorContext context) {
+            // do-nothing since it is in-memory
+        }
+
+        @Override
+        public boolean persistent() {
+            return false;
+        }
+
+        @Override
+        public V get(K key) {
+            return this.map.get(key);
+        }
+
+        @Override
+        public void put(K key, V value) {
+            this.map.put(key, value);
+        }
+
+        @Override
+        public void putAll(List<Entry<K, V>> entries) {
+            for (Entry<K, V> entry : entries)
+                put(entry.key(), entry.value());
+        }
+
+        @Override
+        public V delete(K key) {
+            return this.map.remove(key);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> range(K from, K to) {
+            return new MemoryStoreIterator<K, V>(this.map.subMap(from, true, to, false).entrySet().iterator());
+        }
+
+        @Override
+        public KeyValueIterator<K, V> all() {
+            return new MemoryStoreIterator<K, V>(this.map.entrySet().iterator());
+        }
+
+        @Override
+        public void flush() {
+            // do-nothing since it is in-memory
+        }
+
+        @Override
+        public void close() {
+            // do-nothing
+        }
+
+        private static class MemoryStoreIterator<K, V> implements KeyValueIterator<K, V> {
+            private final Iterator<Map.Entry<K, V>> iter;
+
+            public MemoryStoreIterator(Iterator<Map.Entry<K, V>> iter) {
+                this.iter = iter;
+            }
+
+            @Override
+            public boolean hasNext() {
+                return iter.hasNext();
+            }
+
+            @Override
+            public Entry<K, V> next() {
+                Map.Entry<K, V> entry = iter.next();
+                return new Entry<>(entry.getKey(), entry.getValue());
+            }
+
+            @Override
+            public void remove() {
+                iter.remove();
+            }
+
+            @Override
+            public void close() {
+            }
+
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
new file mode 100644
index 0000000..6a38423
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryLRUCacheStoreSupplier.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.state.Entry;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Serdes;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+/**
+ * An in-memory key-value store that is limited in size and retains a maximum number of most recently used entries.
+ *
+ * @param <K> The key type
+ * @param <V> The value type
+ *
+ */
+public class InMemoryLRUCacheStoreSupplier<K, V> implements StateStoreSupplier {
+
+    private final String name;
+    private final int capacity;
+    private final Serdes serdes;
+    private final Time time;
+
+    public InMemoryLRUCacheStoreSupplier(String name, int capacity, Serdes<K, V> serdes, Time time) {
+        this.name = name;
+        this.capacity = capacity;
+        this.serdes = serdes;
+        this.time = time;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public StateStore get() {
+        MemoryLRUCache<K, V> cache = new MemoryLRUCache<K, V>(name, capacity);
+        final MeteredKeyValueStore<K, V> store = new MeteredKeyValueStore<>(cache, serdes, "in-memory-lru-state", time);
+        cache.whenEldestRemoved(new EldestEntryRemovalListener<K, V>() {
+            @Override
+            public void apply(K key, V value) {
+                store.removed(key);
+            }
+        });
+        return store;
+    }
+
+    private static interface EldestEntryRemovalListener<K, V> {
+        public void apply(K key, V value);
+    }
+
+    protected static final class MemoryLRUCache<K, V> implements KeyValueStore<K, V> {
+
+        private final String name;
+        private final Map<K, V> map;
+        private final NavigableSet<K> keys;
+        private EldestEntryRemovalListener<K, V> listener;
+
+        public MemoryLRUCache(String name, final int maxCacheSize) {
+            this.name = name;
+            this.keys = new TreeSet<>();
+            // leave room for one extra entry to handle adding an entry before the oldest can be removed
+            this.map = new LinkedHashMap<K, V>(maxCacheSize + 1, 1.01f, true) {
+                private static final long serialVersionUID = 1L;
+
+                @Override
+                protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+                    if (size() > maxCacheSize) {
+                        K key = eldest.getKey();
+                        keys.remove(key);
+                        if (listener != null) listener.apply(key, eldest.getValue());
+                        return true;
+                    }
+                    return false;
+                }
+            };
+        }
+
+        protected void whenEldestRemoved(EldestEntryRemovalListener<K, V> listener) {
+            this.listener = listener;
+        }
+
+        @Override
+        public String name() {
+            return this.name;
+        }
+
+        @Override
+        public void init(ProcessorContext context) {
+            // do-nothing since it is in-memory
+        }
+
+        @Override
+        public boolean persistent() {
+            return false;
+        }
+
+        @Override
+        public V get(K key) {
+            return this.map.get(key);
+        }
+
+        @Override
+        public void put(K key, V value) {
+            this.map.put(key, value);
+            this.keys.add(key);
+        }
+
+        @Override
+        public void putAll(List<Entry<K, V>> entries) {
+            for (Entry<K, V> entry : entries)
+                put(entry.key(), entry.value());
+        }
+
+        @Override
+        public V delete(K key) {
+            V value = this.map.remove(key);
+            this.keys.remove(key);
+            return value;
+        }
+
+        @Override
+        public KeyValueIterator<K, V> range(K from, K to) {
+            return new MemoryLRUCache.CacheIterator<K, V>(this.keys.subSet(from, true, to, false).iterator(), this.map);
+        }
+
+        @Override
+        public KeyValueIterator<K, V> all() {
+            return new MemoryLRUCache.CacheIterator<K, V>(this.keys.iterator(), this.map);
+        }
+
+        @Override
+        public void flush() {
+            // do-nothing since it is in-memory
+        }
+
+        @Override
+        public void close() {
+            // do-nothing
+        }
+
+        private static class CacheIterator<K, V> implements KeyValueIterator<K, V> {
+            private final Iterator<K> keys;
+            private final Map<K, V> entries;
+            private K lastKey;
+
+            public CacheIterator(Iterator<K> keys, Map<K, V> entries) {
+                this.keys = keys;
+                this.entries = entries;
+            }
+
+            @Override
+            public boolean hasNext() {
+                return keys.hasNext();
+            }
+
+            @Override
+            public Entry<K, V> next() {
+                lastKey = keys.next();
+                return new Entry<>(lastKey, entries.get(lastKey));
+            }
+
+            @Override
+            public void remove() {
+                keys.remove();
+                entries.remove(lastKey);
+            }
+
+            @Override
+            public void close() {
+                // do nothing
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
new file mode 100644
index 0000000..21f73b0
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.state.Entry;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Serdes;
+
+import java.util.List;
+
+public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
+
+    protected final KeyValueStore<K, V> inner;
+    protected final StoreChangeLogger.ValueGetter getter;
+    protected final Serdes<K, V> serialization;
+    protected final String metricScope;
+    protected final Time time;
+
+    private Sensor putTime;
+    private Sensor getTime;
+    private Sensor deleteTime;
+    private Sensor putAllTime;
+    private Sensor allTime;
+    private Sensor rangeTime;
+    private Sensor flushTime;
+    private Sensor restoreTime;
+    private StreamingMetrics metrics;
+
+    private boolean loggingEnabled = true;
+    private StoreChangeLogger<K, V> changeLogger = null;
+
+    // always wrap the store with the metered store
+    public MeteredKeyValueStore(final KeyValueStore<K, V> inner, Serdes<K, V> serialization, String metricScope, Time time) {
+        this.inner = inner;
+        this.getter = new StoreChangeLogger.ValueGetter<K, V>() {
+            public V get(K key) {
+                return inner.get(key);
+            }
+        };
+        this.serialization = serialization;
+        this.metricScope = metricScope;
+        this.time = time != null ? time : new SystemTime();
+    }
+
+    public MeteredKeyValueStore<K, V> disableLogging() {
+        loggingEnabled = false;
+        return this;
+    }
+
+    @Override
+    public String name() {
+        return inner.name();
+    }
+
+    @Override
+    public void init(ProcessorContext context) {
+        final String name = name();
+        this.metrics = context.metrics();
+        this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
+        this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
+        this.deleteTime = this.metrics.addLatencySensor(metricScope, name, "delete");
+        this.putAllTime = this.metrics.addLatencySensor(metricScope, name, "put-all");
+        this.allTime = this.metrics.addLatencySensor(metricScope, name, "all");
+        this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range");
+        this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
+        this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
+
+        serialization.init(context);
+        this.changeLogger = this.loggingEnabled ? new StoreChangeLogger<>(name, context, serialization) : null;
+
+        // register and possibly restore the state from the logs
+        long startNs = time.nanoseconds();
+        inner.init(context);
+        try {
+            final Deserializer<K> keyDeserializer = serialization.keyDeserializer();
+            final Deserializer<V> valDeserializer = serialization.valueDeserializer();
+
+            context.register(this, loggingEnabled, new StateRestoreCallback() {
+                @Override
+                public void restore(byte[] key, byte[] value) {
+                    inner.put(keyDeserializer.deserialize(name, key),
+                            valDeserializer.deserialize(name, value));
+                }
+            });
+        } finally {
+            this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return inner.persistent();
+    }
+
+    @Override
+    public V get(K key) {
+        long startNs = time.nanoseconds();
+        try {
+            return this.inner.get(key);
+        } finally {
+            this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
+        }
+    }
+
+    @Override
+    public void put(K key, V value) {
+        long startNs = time.nanoseconds();
+        try {
+            this.inner.put(key, value);
+
+            if (loggingEnabled) {
+                changeLogger.add(key);
+                changeLogger.maybeLogChange(this.getter);
+            }
+        } finally {
+            this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
+        }
+    }
+
+    @Override
+    public void putAll(List<Entry<K, V>> entries) {
+        long startNs = time.nanoseconds();
+        try {
+            this.inner.putAll(entries);
+
+            if (loggingEnabled) {
+                for (Entry<K, V> entry : entries) {
+                    K key = entry.key();
+                    changeLogger.add(key);
+                }
+                changeLogger.maybeLogChange(this.getter);
+            }
+        } finally {
+            this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds());
+        }
+    }
+
+    @Override
+    public V delete(K key) {
+        long startNs = time.nanoseconds();
+        try {
+            V value = this.inner.delete(key);
+
+            removed(key);
+
+            return value;
+        } finally {
+            this.metrics.recordLatency(this.deleteTime, startNs, time.nanoseconds());
+        }
+    }
+
+    /**
+     * Called when the underlying {@link #inner} {@link KeyValueStore} removes an entry in response to a call from this
+     * store.
+     *
+     * @param key the key for the entry that the inner store removed
+     */
+    protected void removed(K key) {
+        if (loggingEnabled) {
+            changeLogger.delete(key);
+            changeLogger.maybeLogChange(this.getter);
+        }
+    }
+
+    @Override
+    public KeyValueIterator<K, V> range(K from, K to) {
+        return new MeteredKeyValueIterator<K, V>(this.inner.range(from, to), this.rangeTime);
+    }
+
+    @Override
+    public KeyValueIterator<K, V> all() {
+        return new MeteredKeyValueIterator<K, V>(this.inner.all(), this.allTime);
+    }
+
+    @Override
+    public void close() {
+        inner.close();
+    }
+
+    @Override
+    public void flush() {
+        long startNs = time.nanoseconds();
+        try {
+            this.inner.flush();
+
+            if (loggingEnabled)
+                changeLogger.logChange(this.getter);
+        } finally {
+            this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
+        }
+    }
+
+    private class MeteredKeyValueIterator<K1, V1> implements KeyValueIterator<K1, V1> {
+
+        private final KeyValueIterator<K1, V1> iter;
+        private final Sensor sensor;
+        private final long startNs;
+
+        public MeteredKeyValueIterator(KeyValueIterator<K1, V1> iter, Sensor sensor) {
+            this.iter = iter;
+            this.sensor = sensor;
+            this.startNs = time.nanoseconds();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iter.hasNext();
+        }
+
+        @Override
+        public Entry<K1, V1> next() {
+            return iter.next();
+        }
+
+        @Override
+        public void remove() {
+            iter.remove();
+        }
+
+        @Override
+        public void close() {
+            try {
+                iter.close();
+            } finally {
+                metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
new file mode 100644
index 0000000..821927d
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamingMetrics;
+import org.apache.kafka.streams.kstream.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+
+public class MeteredWindowStore<K, V> implements WindowStore<K, V> {
+
+    protected final WindowStore<K, V> inner;
+    protected final StoreChangeLogger.ValueGetter<byte[], byte[]> getter;
+    protected final String metricScope;
+    protected final Time time;
+
+    private Sensor putTime;
+    private Sensor getTime;
+    private Sensor rangeTime;
+    private Sensor flushTime;
+    private Sensor restoreTime;
+    private StreamingMetrics metrics;
+
+    private boolean loggingEnabled = true;
+    private StoreChangeLogger<byte[], byte[]> changeLogger = null;
+
+    // always wrap the store with the metered store
+    public MeteredWindowStore(final WindowStore<K, V> inner, String metricScope, Time time) {
+        this.inner = inner;
+        this.getter = new StoreChangeLogger.ValueGetter<byte[], byte[]>() {
+            public byte[] get(byte[] key) {
+                return inner.getInternal(key);
+            }
+        };
+        this.metricScope = metricScope;
+        this.time = time != null ? time : new SystemTime();
+    }
+
+    public MeteredWindowStore<K, V> disableLogging() {
+        loggingEnabled = false;
+        return this;
+    }
+
+    @Override
+    public String name() {
+        return inner.name();
+    }
+
+    @Override
+    public void init(ProcessorContext context) {
+        final String name = name();
+        this.metrics = context.metrics();
+        this.putTime = this.metrics.addLatencySensor(metricScope, name, "put");
+        this.getTime = this.metrics.addLatencySensor(metricScope, name, "get");
+        this.rangeTime = this.metrics.addLatencySensor(metricScope, name, "range");
+        this.flushTime = this.metrics.addLatencySensor(metricScope, name, "flush");
+        this.restoreTime = this.metrics.addLatencySensor(metricScope, name, "restore");
+
+        this.changeLogger = this.loggingEnabled ?
+                new StoreChangeLogger<>(name, context, Serdes.withBuiltinTypes("", byte[].class, byte[].class)) : null;
+
+        // register and possibly restore the state from the logs
+        long startNs = time.nanoseconds();
+        inner.init(context);
+        try {
+            context.register(this, loggingEnabled, new StateRestoreCallback() {
+                @Override
+                public void restore(byte[] key, byte[] value) {
+                    inner.putInternal(key, value);
+                }
+            });
+        } finally {
+            this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
+        }
+    }
+
+    @Override
+    public boolean persistent() {
+        return inner.persistent();
+    }
+
+    @Override
+    public WindowStoreIterator<V> fetch(K key, long timeFrom, long timeTo) {
+        return new MeteredWindowStoreIterator<>(this.inner.fetch(key, timeFrom, timeTo), this.rangeTime);
+    }
+
+    @Override
+    public void put(K key, V value) {
+        putAndReturnInternalKey(key, value, -1L);
+    }
+
+    @Override
+    public void put(K key, V value, long timestamp) {
+        putAndReturnInternalKey(key, value, timestamp);
+    }
+
+    @Override
+    public byte[] putAndReturnInternalKey(K key, V value, long timestamp) {
+        long startNs = time.nanoseconds();
+        try {
+            byte[] binKey = this.inner.putAndReturnInternalKey(key, value, timestamp);
+
+            if (loggingEnabled) {
+                changeLogger.add(binKey);
+                changeLogger.maybeLogChange(this.getter);
+            }
+
+            return binKey;
+        } finally {
+            this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
+        }
+    }
+
+    @Override
+    public void putInternal(byte[] binaryKey, byte[] binaryValue) {
+        inner.putInternal(binaryKey, binaryValue);
+    }
+
+    @Override
+    public byte[] getInternal(byte[] binaryKey) {
+        long startNs = time.nanoseconds();
+        try {
+            return this.inner.getInternal(binaryKey);
+        } finally {
+            this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
+        }
+    }
+
+    @Override
+    public void close() {
+        inner.close();
+    }
+
+    @Override
+    public void flush() {
+        long startNs = time.nanoseconds();
+        try {
+            this.inner.flush();
+
+            if (loggingEnabled)
+                changeLogger.logChange(this.getter);
+        } finally {
+            this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
+        }
+    }
+
+    private class MeteredWindowStoreIterator<E> implements WindowStoreIterator<E> {
+
+        private final WindowStoreIterator<E> iter;
+        private final Sensor sensor;
+        private final long startNs;
+
+        public MeteredWindowStoreIterator(WindowStoreIterator<E> iter, Sensor sensor) {
+            this.iter = iter;
+            this.sensor = sensor;
+            this.startNs = time.nanoseconds();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return iter.hasNext();
+        }
+
+        @Override
+        public KeyValue<Long, E> next() {
+            return iter.next();
+        }
+
+        @Override
+        public void remove() {
+            iter.remove();
+        }
+
+        @Override
+        public void close() {
+            try {
+                iter.close();
+            } finally {
+                metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
+            }
+        }
+
+    }
+
+    WindowStore<K, V> inner() {
+        return inner;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/959cf09e/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
new file mode 100644
index 0000000..e276f83
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class saves out a map of topic/partition=&gt;offsets to a file. The format of the file is UTF-8 text containing the following:
+ * <pre>
+ *   &lt;version&gt;
+ *   &lt;n&gt;
+ *   &lt;topic_name_1&gt; &lt;partition_1&gt; &lt;offset_1&gt;
+ *   .
+ *   .
+ *   .
+ *   &lt;topic_name_n&gt; &lt;partition_n&gt; &lt;offset_n&gt;
+ * </pre>
+ *   The first line contains a number designating the format version (currently 0), the get line contains
+ *   a number giving the total number of offsets. Each successive line gives a topic/partition/offset triple
+ *   separated by spaces.
+ */
+public class OffsetCheckpoint {
+
+    private static final int VERSION = 0;
+
+    private final File file;
+    private final Object lock;
+
+    public OffsetCheckpoint(File file) throws IOException {
+        this.file = file;
+        this.lock = new Object();
+    }
+
+    public void write(Map<TopicPartition, Long> offsets) throws IOException {
+        synchronized (lock) {
+            // write to temp file and then swap with the existing file
+            File temp = new File(file.getAbsolutePath() + ".tmp");
+
+            FileOutputStream fileOutputStream = new FileOutputStream(temp);
+            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream));
+            try {
+                writeIntLine(writer, VERSION);
+                writeIntLine(writer, offsets.size());
+
+                for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet())
+                    writeEntry(writer, entry.getKey(), entry.getValue());
+
+                writer.flush();
+                fileOutputStream.getFD().sync();
+            } finally {
+                writer.close();
+            }
+
+            Utils.atomicMoveWithFallback(temp.toPath(), file.toPath());
+        }
+    }
+
+    private void writeIntLine(BufferedWriter writer, int number) throws IOException {
+        writer.write(Integer.toString(number));
+        writer.newLine();
+    }
+
+    private void writeEntry(BufferedWriter writer, TopicPartition part, long offset) throws IOException {
+        writer.write(part.topic());
+        writer.write(' ');
+        writer.write(Integer.toString(part.partition()));
+        writer.write(' ');
+        writer.write(Long.toString(offset));
+        writer.newLine();
+    }
+
+    public Map<TopicPartition, Long> read() throws IOException {
+        synchronized (lock) {
+            BufferedReader reader = null;
+            try {
+                reader = new BufferedReader(new FileReader(file));
+            } catch (FileNotFoundException e) {
+                return Collections.emptyMap();
+            }
+
+            try {
+                int version = readInt(reader);
+                switch (version) {
+                    case 0:
+                        int expectedSize = readInt(reader);
+                        Map<TopicPartition, Long> offsets = new HashMap<>();
+                        String line = reader.readLine();
+                        while (line != null) {
+                            String[] pieces = line.split("\\s+");
+                            if (pieces.length != 3)
+                                throw new IOException(String.format("Malformed line in offset checkpoint file: '%s'.",
+                                    line));
+
+                            String topic = pieces[0];
+                            int partition = Integer.parseInt(pieces[1]);
+                            long offset = Long.parseLong(pieces[2]);
+                            offsets.put(new TopicPartition(topic, partition), offset);
+                            line = reader.readLine();
+                        }
+                        if (offsets.size() != expectedSize)
+                            throw new IOException(String.format("Expected %d entries but found only %d",
+                                expectedSize,
+                                offsets.size()));
+                        return offsets;
+
+                    default:
+                        throw new IllegalArgumentException("Unknown offset checkpoint version: " + version);
+                }
+            } finally {
+                if (reader != null)
+                    reader.close();
+            }
+        }
+    }
+
+    private int readInt(BufferedReader reader) throws IOException {
+        String line = reader.readLine();
+        if (line == null)
+            throw new EOFException("File ended prematurely.");
+        int val = Integer.parseInt(line);
+        return val;
+    }
+
+    public void delete() throws IOException {
+        file.delete();
+    }
+
+    @Override
+    public String toString() {
+        return this.file.getAbsolutePath();
+    }
+
+}