You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/03 05:08:40 UTC

kafka git commit: KAFKA-2591: Fix StreamingMetrics

Repository: kafka
Updated Branches:
  refs/heads/trunk b56e02b65 -> 37f7d75e3


KAFKA-2591: Fix StreamingMetrics

Remove state storage upon unclean shutdown and fix streaming metrics used for local state.

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Edward Ribeiro, Yasuhiro Matsuda, Jun Rao

Closes #265 from guozhangwang/K2591


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/37f7d75e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/37f7d75e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/37f7d75e

Branch: refs/heads/trunk
Commit: 37f7d75e3d55c600902cd15c3cb219ddd221d23c
Parents: b56e02b
Author: Guozhang Wang <wa...@gmail.com>
Authored: Fri Oct 2 20:12:34 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Oct 2 20:12:34 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/common/MetricName.java     |  5 +-
 .../kafka/common/metrics/MetricsTest.java       |  3 +-
 .../apache/kafka/streams/KafkaStreaming.java    | 35 +++++++-
 .../apache/kafka/streams/StreamingConfig.java   | 46 ++++++++--
 .../apache/kafka/streams/StreamingMetrics.java  | 27 ++++++
 .../kafka/streams/kstream/SlidingWindowDef.java | 10 +--
 .../streams/processor/ProcessorContext.java     |  8 +-
 .../kafka/streams/processor/RestoreFunc.java    | 27 ------
 .../streams/processor/StateRestoreCallback.java | 27 ++++++
 .../internals/ProcessorContextImpl.java         | 14 +--
 .../internals/ProcessorStateManager.java        | 13 ++-
 .../streams/processor/internals/StreamTask.java |  8 +-
 .../processor/internals/StreamThread.java       | 94 +++++++++++++++-----
 .../streams/state/InMemoryKeyValueStore.java    |  7 +-
 .../streams/state/MeteredKeyValueStore.java     | 73 +++++----------
 .../streams/state/RocksDBKeyValueStore.java     |  6 +-
 .../internals/ProcessorStateManagerTest.java    | 18 ++--
 .../processor/internals/StreamTaskTest.java     |  4 +-
 .../processor/internals/StreamThreadTest.java   |  9 +-
 .../apache/kafka/test/MockProcessorContext.java | 10 +--
 .../kafka/test/ProcessorTopologyTestDriver.java | 21 ++++-
 21 files changed, 295 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/clients/src/main/java/org/apache/kafka/common/MetricName.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/MetricName.java b/clients/src/main/java/org/apache/kafka/common/MetricName.java
index 04b4a09..ee50f33 100644
--- a/clients/src/main/java/org/apache/kafka/common/MetricName.java
+++ b/clients/src/main/java/org/apache/kafka/common/MetricName.java
@@ -87,11 +87,12 @@ public final class MetricName {
 
     private static Map<String, String> getTags(String... keyValue) {
         if ((keyValue.length % 2) != 0)
-            throw new IllegalArgumentException("keyValue needs to be specified in paris");
+            throw new IllegalArgumentException("keyValue needs to be specified in pairs");
         Map<String, String> tags = new HashMap<String, String>();
 
-        for (int i = 0; i < keyValue.length / 2; i++)
+        for (int i = 0; i < keyValue.length; i += 2)
             tags.put(keyValue[i], keyValue[i + 1]);
+
         return tags;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
index 5b7736e..175a036 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java
@@ -47,9 +47,10 @@ public class MetricsTest {
 
     @Test
     public void testMetricName() {
-        MetricName n1 = new MetricName("name", "group", "description", "key1", "value1");
+        MetricName n1 = new MetricName("name", "group", "description", "key1", "value1", "key2", "value2");
         Map<String, String> tags = new HashMap<String, String>();
         tags.put("key1", "value1");
+        tags.put("key2", "value2");
         MetricName n2 = new MetricName("name", "group", "description", tags);
         assertEquals("metric names created in two different ways should be equal", n1, n2);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
index f3a99e0..d274fb9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java
@@ -17,11 +17,21 @@
 
 package org.apache.kafka.streams;
 
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 /**
  * Kafka Streaming allows for performing continuous computation on input coming from one or more input topics and
  * sends output to zero or more output topics.
@@ -61,8 +71,12 @@ import org.slf4j.LoggerFactory;
 public class KafkaStreaming {
 
     private static final Logger log = LoggerFactory.getLogger(KafkaStreaming.class);
+    private static final AtomicInteger STREAMING_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
+    private static final String JMX_PREFIX = "kafka.streaming";
+
+    private final Time time;
 
-    // Container States
+    // container states
     private static final int CREATED = 0;
     private static final int RUNNING = 1;
     private static final int STOPPED = 2;
@@ -70,10 +84,27 @@ public class KafkaStreaming {
 
     private final StreamThread[] threads;
 
+    private String clientId;
+    private final Metrics metrics;
+
     public KafkaStreaming(TopologyBuilder builder, StreamingConfig config) throws Exception {
+        // create the metrics
+        this.time = new SystemTime();
+
+        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(StreamingConfig.METRICS_NUM_SAMPLES_CONFIG))
+            .timeWindow(config.getLong(StreamingConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
+                TimeUnit.MILLISECONDS);
+        clientId = config.getString(StreamingConfig.CLIENT_ID_CONFIG);
+        if (clientId.length() <= 0)
+            clientId = "streaming-" + STREAMING_CLIENT_ID_SEQUENCE.getAndIncrement();
+        List<MetricsReporter> reporters = config.getConfiguredInstances(StreamingConfig.METRIC_REPORTER_CLASSES_CONFIG,
+            MetricsReporter.class);
+        reporters.add(new JmxReporter(JMX_PREFIX));
+        this.metrics = new Metrics(metricConfig, reporters, time);
+
         this.threads = new StreamThread[config.getInt(StreamingConfig.NUM_STREAM_THREADS_CONFIG)];
         for (int i = 0; i < this.threads.length; i++) {
-            this.threads[i] = new StreamThread(builder, config);
+            this.threads[i] = new StreamThread(builder, config, this.clientId, this.metrics, this.time);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
index dce69b6..93df4c2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java
@@ -27,6 +27,8 @@ import org.apache.kafka.common.config.ConfigDef.Type;
 
 import java.util.Map;
 
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+
 public class StreamingConfig extends AbstractConfig {
 
     private static final ConfigDef CONFIG;
@@ -83,6 +85,15 @@ public class StreamingConfig extends AbstractConfig {
     /** <code>value.deserializer</code> */
     public static final String VALUE_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 
+    /** <code>metrics.sample.window.ms</code> */
+    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
+
+    /** <code>metrics.num.samples</code> */
+    public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
+
+    /** <code>metric.reporters</code> */
+    public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
+
     /**
      * <code>bootstrap.servers</code>
      */
@@ -97,15 +108,15 @@ public class StreamingConfig extends AbstractConfig {
                                         Importance.MEDIUM,
                                         CommonClientConfigs.CLIENT_ID_DOC)
                                 .define(STATE_DIR_CONFIG,
-                                        Type.STRING,
-                                        SYSTEM_TEMP_DIRECTORY,
-                                        Importance.MEDIUM,
-                                        STATE_DIR_DOC)
+                                    Type.STRING,
+                                    SYSTEM_TEMP_DIRECTORY,
+                                    Importance.MEDIUM,
+                                    STATE_DIR_DOC)
                                 .define(COMMIT_INTERVAL_MS_CONFIG,
-                                        Type.LONG,
-                                        30000,
-                                        Importance.HIGH,
-                                        COMMIT_INTERVAL_MS_DOC)
+                                    Type.LONG,
+                                    30000,
+                                    Importance.HIGH,
+                                    COMMIT_INTERVAL_MS_DOC)
                                 .define(POLL_MS_CONFIG,
                                         Type.LONG,
                                         100,
@@ -159,7 +170,24 @@ public class StreamingConfig extends AbstractConfig {
                                 .define(BOOTSTRAP_SERVERS_CONFIG,
                                         Type.STRING,
                                         Importance.HIGH,
-                                        CommonClientConfigs.BOOSTRAP_SERVERS_DOC);
+                                        CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
+                                .define(METRIC_REPORTER_CLASSES_CONFIG,
+                                        Type.LIST,
+                                        "",
+                                        Importance.LOW,
+                                        CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+                                .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
+                                        Type.LONG,
+                                        30000,
+                                        atLeast(0),
+                                        Importance.LOW,
+                                        CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
+                                .define(METRICS_NUM_SAMPLES_CONFIG,
+                                        Type.INT,
+                                        2,
+                                        atLeast(1),
+                                        Importance.LOW,
+                                        CommonClientConfigs.METRICS_NUM_SAMPLES_DOC);
     }
 
     public StreamingConfig(Map<?, ?> props) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java b/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
new file mode 100644
index 0000000..ebf80b3
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamingMetrics.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.metrics.Sensor;
+
+public interface StreamingMetrics {
+
+    Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags);
+
+    void recordLatency(Sensor sensor, long startNs, long endNs);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java
index cc03541..5927db6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java
@@ -25,10 +25,10 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.internals.FilteredIterator;
 import org.apache.kafka.streams.kstream.internals.WindowSupport;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.RestoreFunc;
 import org.apache.kafka.streams.processor.internals.Stamped;
 
 import java.util.Collections;
@@ -83,7 +83,7 @@ public class SlidingWindowDef<K, V> implements WindowDef<K, V> {
         @Override
         public void init(ProcessorContext context) {
             this.context = context;
-            RestoreFuncImpl restoreFunc = new RestoreFuncImpl();
+            SlidingWindowRegistryCallback restoreFunc = new SlidingWindowRegistryCallback();
             context.register(this, restoreFunc);
 
             for (ValueList<V> valueList : map.values()) {
@@ -229,17 +229,17 @@ public class SlidingWindowDef<K, V> implements WindowDef<K, V> {
             return false;
         }
 
-        private class RestoreFuncImpl implements RestoreFunc {
+        private class SlidingWindowRegistryCallback implements StateRestoreCallback {
 
             final IntegerDeserializer intDeserializer;
             int slotNum = 0;
 
-            RestoreFuncImpl() {
+            SlidingWindowRegistryCallback() {
                 intDeserializer = new IntegerDeserializer();
             }
 
             @Override
-            public void apply(byte[] slot, byte[] bytes) {
+            public void restore(byte[] slot, byte[] bytes) {
 
                 slotNum = intDeserializer.deserialize("", slot);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index 6b32b83..adffe0e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -17,9 +17,9 @@
 
 package org.apache.kafka.streams.processor;
 
-import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.StreamingMetrics;
 
 import java.io.File;
 
@@ -70,9 +70,9 @@ public interface ProcessorContext {
     /**
      * Returns Metrics instance
      *
-     * @return Metrics
+     * @return StreamingMetrics
      */
-    Metrics metrics();
+    StreamingMetrics metrics();
 
     /**
      * Check if this process's incoming streams are joinable
@@ -84,7 +84,7 @@ public interface ProcessorContext {
      *
      * @param store the storage engine
      */
-    void register(StateStore store, RestoreFunc restoreFunc);
+    void register(StateStore store, StateRestoreCallback stateRestoreCallback);
 
     StateStore getStateStore(String name);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/processor/RestoreFunc.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/RestoreFunc.java b/streams/src/main/java/org/apache/kafka/streams/processor/RestoreFunc.java
deleted file mode 100644
index 883147e..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/processor/RestoreFunc.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor;
-
-/**
- * Restoration logic for log-backed state stores upon restart,
- * it takes one record at a time from the logs to apply to the restoring state.
- */
-public interface RestoreFunc {
-
-    void apply(byte[] key, byte[] value);
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java
new file mode 100644
index 0000000..39decec
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor;
+
+/**
+ * Restoration logic for log-backed state stores upon restart,
+ * it takes one record at a time from the logs to apply to the restoring state.
+ */
+public interface StateRestoreCallback {
+
+    void restore(byte[] key, byte[] value);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index b350222..60ac1df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -19,13 +19,13 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.RestoreFunc;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,7 +43,7 @@ public class ProcessorContextImpl implements ProcessorContext {
 
     private final int id;
     private final StreamTask task;
-    private final Metrics metrics;
+    private final StreamingMetrics metrics;
     private final RecordCollector collector;
     private final ProcessorStateManager stateMgr;
 
@@ -60,7 +60,7 @@ public class ProcessorContextImpl implements ProcessorContext {
                                 StreamingConfig config,
                                 RecordCollector collector,
                                 ProcessorStateManager stateMgr,
-                                Metrics metrics) {
+                                StreamingMetrics metrics) {
         this.id = id;
         this.task = task;
         this.metrics = metrics;
@@ -143,16 +143,16 @@ public class ProcessorContextImpl implements ProcessorContext {
     }
 
     @Override
-    public Metrics metrics() {
+    public StreamingMetrics metrics() {
         return metrics;
     }
 
     @Override
-    public void register(StateStore store, RestoreFunc restoreFunc) {
+    public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
         if (initialized)
             throw new KafkaException("Can only create state stores during initialization.");
 
-        stateMgr.register(store, restoreFunc);
+        stateMgr.register(store, stateRestoreCallback);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 2f1fb35..59a6394 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -19,7 +19,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.RestoreFunc;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -96,7 +96,7 @@ public class ProcessorStateManager {
         return this.baseDir;
     }
 
-    public void register(StateStore store, RestoreFunc restoreFunc) {
+    public void register(StateStore store, StateRestoreCallback stateRestoreCallback) {
         if (store.name().equals(CHECKPOINT_FILE_NAME))
             throw new IllegalArgumentException("Illegal store name: " + CHECKPOINT_FILE_NAME);
 
@@ -138,12 +138,11 @@ public class ProcessorStateManager {
         restoreConsumer.seekToEnd(storePartition);
         long endOffset = restoreConsumer.position(storePartition);
 
-        // load the previously flushed state and restore from the checkpointed offset of the change log
-        // if it exists in the offset file; restore the state from the beginning of the change log otherwise
-        if (checkpointedOffsets.containsKey(storePartition)) {
+        // restore from the checkpointed offset of the change log if it is persistent and the offset exists;
+        // restore the state from the beginning of the change log otherwise
+        if (checkpointedOffsets.containsKey(storePartition) && store.persistent()) {
             restoreConsumer.seek(storePartition, checkpointedOffsets.get(storePartition));
         } else {
-            // TODO: in this case, we need to ignore the preciously flushed state
             restoreConsumer.seekToBeginning(storePartition);
         }
 
@@ -151,7 +150,7 @@ public class ProcessorStateManager {
         // should not change since it is only written by this thread.
         while (true) {
             for (ConsumerRecord<byte[], byte[]> record : restoreConsumer.poll(100).records(storePartition)) {
-                restoreFunc.apply(record.key(), record.value());
+                stateRestoreCallback.restore(record.key(), record.value());
             }
 
             if (restoreConsumer.position(storePartition) == endOffset) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 40fb723..6afa427 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -23,8 +23,8 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.slf4j.Logger;
@@ -74,6 +74,7 @@ public class StreamTask implements Punctuator {
      * @param partitions            the collection of assigned {@link TopicPartition}
      * @param topology              the instance of {@link ProcessorTopology}
      * @param config                the {@link StreamingConfig} specified by the user
+     * @param metrics               the {@link StreamingMetrics} created by the thread
      */
     public StreamTask(int id,
                       Consumer<byte[], byte[]> consumer,
@@ -81,7 +82,8 @@ public class StreamTask implements Punctuator {
                       Consumer<byte[], byte[]> restoreConsumer,
                       Collection<TopicPartition> partitions,
                       ProcessorTopology topology,
-                      StreamingConfig config) {
+                      StreamingConfig config,
+                      StreamingMetrics metrics) {
 
         this.id = id;
         this.consumer = consumer;
@@ -119,7 +121,7 @@ public class StreamTask implements Punctuator {
         }
 
         // initialize the topology with its own context
-        this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, new Metrics());
+        this.processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics);
 
         // initialize the task by initializing all its processor nodes in the topology
         for (ProcessorNode node : this.topology.processors()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f37903f..95a923d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.MeasurableStat;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
@@ -34,10 +35,10 @@ import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,7 +59,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class StreamThread extends Thread {
 
     private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
-    private static AtomicInteger nextThreadNumber = new AtomicInteger(1);
+    private static final AtomicInteger STREAMING_THREAD_ID_SEQUENCE = new AtomicInteger(1);
 
     private final AtomicBoolean running;
 
@@ -69,13 +70,14 @@ public class StreamThread extends Thread {
     protected final Consumer<byte[], byte[]> restoreConsumer;
 
     private final Map<Integer, StreamTask> tasks;
+    private final String clientId;
     private final Time time;
     private final File stateDir;
     private final long pollTimeMs;
     private final long cleanTimeMs;
     private final long commitTimeMs;
     private final long totalRecordsToProcess;
-    private final StreamingMetrics metrics;
+    private final StreamingMetricsImpl sensors;
 
     private long lastClean;
     private long lastCommit;
@@ -96,19 +98,27 @@ public class StreamThread extends Thread {
         }
     };
 
-    public StreamThread(TopologyBuilder builder, StreamingConfig config) throws Exception {
-        this(builder, config, null , null, null, new SystemTime());
+    public StreamThread(TopologyBuilder builder,
+                        StreamingConfig config,
+                        String clientId,
+                        Metrics metrics,
+                        Time time) throws Exception {
+        this(builder, config, null , null, null, clientId, metrics, time);
     }
 
-    StreamThread(TopologyBuilder builder, StreamingConfig config,
+    StreamThread(TopologyBuilder builder,
+                 StreamingConfig config,
                  Producer<byte[], byte[]> producer,
                  Consumer<byte[], byte[]> consumer,
                  Consumer<byte[], byte[]> restoreConsumer,
+                 String clientId,
+                 Metrics metrics,
                  Time time) throws Exception {
-        super("StreamThread-" + nextThreadNumber.getAndIncrement());
+        super("StreamThread-" + STREAMING_THREAD_ID_SEQUENCE.getAndIncrement());
 
         this.config = config;
         this.builder = builder;
+        this.clientId = clientId;
 
         // set the producer and consumer clients
         this.producer = (producer != null) ? producer : createProducer();
@@ -131,7 +141,7 @@ public class StreamThread extends Thread {
         this.recordsProcessed = 0;
         this.time = time;
 
-        this.metrics = new StreamingMetrics();
+        this.sensors = new StreamingMetricsImpl(metrics);
 
         this.running = new AtomicBoolean(true);
     }
@@ -237,7 +247,7 @@ public class StreamThread extends Thread {
                 }
 
                 long endPoll = time.milliseconds();
-                metrics.pollTimeSensor.record(endPoll - startPoll);
+                sensors.pollTimeSensor.record(endPoll - startPoll);
 
                 // try to process one record from each task
                 totalNumBuffered = 0;
@@ -247,7 +257,7 @@ public class StreamThread extends Thread {
 
                     totalNumBuffered += task.process();
 
-                    metrics.processTimeSensor.record(time.milliseconds() - startProcess);
+                    sensors.processTimeSensor.record(time.milliseconds() - startProcess);
                 }
 
                 maybePunctuate();
@@ -279,7 +289,7 @@ public class StreamThread extends Thread {
                 long now = time.milliseconds();
 
                 if (task.maybePunctuate(now))
-                    metrics.punctuateTimeSensor.record(time.milliseconds() - now);
+                    sensors.punctuateTimeSensor.record(time.milliseconds() - now);
 
             } catch (Exception e) {
                 log.error("Failed to commit task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
@@ -334,7 +344,7 @@ public class StreamThread extends Thread {
             throw e;
         }
 
-        metrics.commitTimeSensor.record(time.milliseconds() - now);
+        sensors.commitTimeSensor.record(time.milliseconds() - now);
     }
 
     /**
@@ -381,9 +391,9 @@ public class StreamThread extends Thread {
     }
 
     protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) {
-        metrics.taskCreationSensor.record();
+        sensors.taskCreationSensor.record();
 
-        return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, builder.build(), config);
+        return new StreamTask(id, consumer, producer, restoreConsumer, partitionsForTask, builder.build(), config, sensors);
     }
 
     private void addPartitions(Collection<TopicPartition> assignment) {
@@ -425,13 +435,15 @@ public class StreamThread extends Thread {
                 log.error("Failed to close a task #" + task.id() + " in thread [" + this.getName() + "]: ", e);
                 throw e;
             }
-            metrics.taskDestructionSensor.record();
+            sensors.taskDestructionSensor.record();
         }
         tasks.clear();
     }
 
-    private class StreamingMetrics {
+    private class StreamingMetricsImpl implements StreamingMetrics {
         final Metrics metrics;
+        final String metricGrpName;
+        final Map<String, String> metricTags;
 
         final Sensor commitTimeSensor;
         final Sensor pollTimeSensor;
@@ -440,12 +452,12 @@ public class StreamThread extends Thread {
         final Sensor taskCreationSensor;
         final Sensor taskDestructionSensor;
 
-        public StreamingMetrics() {
-            String metricGrpName = "streaming-metrics";
+        public StreamingMetricsImpl(Metrics metrics) {
 
-            this.metrics = new Metrics();
-            Map<String, String> metricTags = new LinkedHashMap<String, String>();
-            metricTags.put("client-id", config.getString(StreamingConfig.CLIENT_ID_CONFIG) + "-" + getName());
+            this.metrics = metrics;
+            this.metricGrpName = "streaming-metrics";
+            this.metricTags = new LinkedHashMap<>();
+            this.metricTags.put("client-id", clientId + "-" + getName());
 
             this.commitTimeSensor = metrics.sensor("commit-time");
             this.commitTimeSensor.add(new MetricName("commit-time-avg", metricGrpName, "The average commit time in ms", metricTags), new Avg());
@@ -473,5 +485,45 @@ public class StreamThread extends Thread {
             this.taskDestructionSensor = metrics.sensor("task-destruction");
             this.taskDestructionSensor.add(new MetricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count()));
         }
+
+        @Override
+        public void recordLatency(Sensor sensor, long startNs, long endNs) {
+            sensor.record((endNs - startNs) / 1000000, endNs);
+        }
+
+        @Override
+        public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
+            // extract the additional tags if there are any
+            Map<String, String> tagMap = new HashMap<>(this.metricTags);
+            if ((tags.length % 2) != 0)
+                throw new IllegalArgumentException("Tags needs to be specified in key-value pairs");
+
+            for (int i = 0; i < tags.length; i += 2)
+                tagMap.put(tags[i], tags[i + 1]);
+
+            // first add the global operation metrics if not yet, with the global tags only
+            Sensor parent = metrics.sensor(operationName);
+            addLatencyMetrics(this.metricGrpName, parent, "all", operationName, this.metricTags);
+
+            // add the store operation metrics with additional tags
+            Sensor sensor = metrics.sensor(entityName + "-" + operationName, parent);
+            addLatencyMetrics("streaming-" + scopeName + "-metrics", sensor, entityName, operationName, tagMap);
+
+            return sensor;
+        }
+
+        private void addLatencyMetrics(String metricGrpName, Sensor sensor, String entityName, String opName, Map<String, String> tags) {
+            maybeAddMetric(sensor, new MetricName(opName + "-avg-latency-ms", metricGrpName,
+                "The average latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Avg());
+            maybeAddMetric(sensor, new MetricName(opName + "-max-latency-ms", metricGrpName,
+                "The max latency in milliseconds of " + entityName + " " + opName + " operation.", tags), new Max());
+            maybeAddMetric(sensor, new MetricName(opName + "-qps", metricGrpName,
+                "The average number of occurrence of " + entityName + " " + opName + " operation per second.", tags), new Rate(new Count()));
+        }
+
+        private void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) {
+            if (!metrics.metrics().containsKey(name))
+                sensor.add(name, stat);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
index e9aaa20..59a8496 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/InMemoryKeyValueStore.java
@@ -40,7 +40,7 @@ public class InMemoryKeyValueStore<K, V> extends MeteredKeyValueStore<K, V> {
     }
 
     public InMemoryKeyValueStore(String name, ProcessorContext context, Time time) {
-        super(name, new MemoryStore<K, V>(name, context), context, "kafka-streams", time);
+        super(name, new MemoryStore<K, V>(name, context), context, "in-memory-state", time);
     }
 
     private static class MemoryStore<K, V> implements KeyValueStore<K, V> {
@@ -103,11 +103,6 @@ public class InMemoryKeyValueStore<K, V> extends MeteredKeyValueStore<K, V> {
             // do-nothing since it is in-memory
         }
 
-        public void restore() {
-            // this should not happen since it is in-memory, hence no state to load from disk
-            throw new IllegalStateException("This should not happen");
-        }
-
         @Override
         public void close() {
             // do-nothing

http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
index 018f1c6..68333d5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/MeteredKeyValueStore.java
@@ -17,17 +17,11 @@
 
 package org.apache.kafka.streams.state;
 
+import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.RestoreFunc;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.metrics.MeasurableStat;
-import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Count;
-import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Time;
@@ -43,7 +37,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     protected final KeyValueStore<K, V> inner;
 
     private final Time time;
-    private final String group;
     private final Sensor putTime;
     private final Sensor getTime;
     private final Sensor deleteTime;
@@ -52,7 +45,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     private final Sensor rangeTime;
     private final Sensor flushTime;
     private final Sensor restoreTime;
-    private final Metrics metrics;
+    private final StreamingMetrics metrics;
 
     private final String topic;
     private final int partition;
@@ -61,20 +54,19 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
     private final ProcessorContext context;
 
     // always wrap the logged store with the metered store
-    public MeteredKeyValueStore(final String name, final KeyValueStore<K, V> inner, ProcessorContext context, String group, Time time) {
+    public MeteredKeyValueStore(final String name, final KeyValueStore<K, V> inner, ProcessorContext context, String metricGrp, Time time) {
         this.inner = inner;
 
         this.time = time;
-        this.group = group;
         this.metrics = context.metrics();
-        this.putTime = createSensor(name, "put");
-        this.getTime = createSensor(name, "get");
-        this.deleteTime = createSensor(name, "delete");
-        this.putAllTime = createSensor(name, "put-all");
-        this.allTime = createSensor(name, "all");
-        this.rangeTime = createSensor(name, "range");
-        this.flushTime = createSensor(name, "flush");
-        this.restoreTime = createSensor(name, "restore");
+        this.putTime = this.metrics.addLatencySensor(metricGrp, name, "put", "store-name", name);
+        this.getTime = this.metrics.addLatencySensor(metricGrp, name, "get", "store-name", name);
+        this.deleteTime = this.metrics.addLatencySensor(metricGrp, name, "delete", "store-name", name);
+        this.putAllTime = this.metrics.addLatencySensor(metricGrp, name, "put-all", "store-name", name);
+        this.allTime = this.metrics.addLatencySensor(metricGrp, name, "all", "store-name", name);
+        this.rangeTime = this.metrics.addLatencySensor(metricGrp, name, "range", "store-name", name);
+        this.flushTime = this.metrics.addLatencySensor(metricGrp, name, "flush", "store-name", name);
+        this.restoreTime = this.metrics.addLatencySensor(metricGrp, name, "restore", "store-name", name);
 
         this.topic = name;
         this.partition = context.id();
@@ -90,37 +82,18 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
             final Deserializer<K> keyDeserializer = (Deserializer<K>) context.keyDeserializer();
             final Deserializer<V> valDeserializer = (Deserializer<V>) context.valueDeserializer();
 
-            context.register(this, new RestoreFunc() {
+            context.register(this, new StateRestoreCallback() {
                 @Override
-                public void apply(byte[] key, byte[] value) {
+                public void restore(byte[] key, byte[] value) {
                     inner.put(keyDeserializer.deserialize(topic, key),
                         valDeserializer.deserialize(topic, value));
                 }
             });
         } finally {
-            recordLatency(this.restoreTime, startNs, time.nanoseconds());
+            this.metrics.recordLatency(this.restoreTime, startNs, time.nanoseconds());
         }
     }
 
-    private Sensor createSensor(String storeName, String operation) {
-        Sensor parent = metrics.sensor(operation);
-        addLatencyMetrics(parent, operation);
-        Sensor sensor = metrics.sensor(storeName + "- " + operation, parent);
-        addLatencyMetrics(sensor, operation, "store-name", storeName);
-        return sensor;
-    }
-
-    private void addLatencyMetrics(Sensor sensor, String opName, String... kvs) {
-        maybeAddMetric(sensor, new MetricName(opName + "-avg-latency-ms", group, "The average latency in milliseconds of the key-value store operation.", kvs), new Avg());
-        maybeAddMetric(sensor, new MetricName(opName + "-max-latency-ms", group, "The max latency in milliseconds of the key-value store operation.", kvs), new Max());
-        maybeAddMetric(sensor, new MetricName(opName + "-qps", group, "The average number of occurance of the given key-value store operation per second.", kvs), new Rate(new Count()));
-    }
-
-    private void maybeAddMetric(Sensor sensor, MetricName name, MeasurableStat stat) {
-        if (!metrics.metrics().containsKey(name))
-            sensor.add(name, stat);
-    }
-
     @Override
     public String name() {
         return inner.name();
@@ -137,7 +110,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         try {
             return this.inner.get(key);
         } finally {
-            recordLatency(this.getTime, startNs, time.nanoseconds());
+            this.metrics.recordLatency(this.getTime, startNs, time.nanoseconds());
         }
     }
 
@@ -151,7 +124,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
             if (this.dirty.size() > this.maxDirty)
                 logChange();
         } finally {
-            recordLatency(this.putTime, startNs, time.nanoseconds());
+            this.metrics.recordLatency(this.putTime, startNs, time.nanoseconds());
         }
     }
 
@@ -168,7 +141,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
             if (this.dirty.size() > this.maxDirty)
                 logChange();
         } finally {
-            recordLatency(this.putAllTime, startNs, time.nanoseconds());
+            this.metrics.recordLatency(this.putAllTime, startNs, time.nanoseconds());
         }
     }
 
@@ -184,7 +157,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
 
             return value;
         } finally {
-            recordLatency(this.deleteTime, startNs, time.nanoseconds());
+            this.metrics.recordLatency(this.deleteTime, startNs, time.nanoseconds());
         }
     }
 
@@ -210,7 +183,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
             this.inner.flush();
             logChange();
         } finally {
-            recordLatency(this.flushTime, startNs, time.nanoseconds());
+            this.metrics.recordLatency(this.flushTime, startNs, time.nanoseconds());
         }
     }
 
@@ -228,10 +201,6 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
         }
     }
 
-    private void recordLatency(Sensor sensor, long startNs, long endNs) {
-        sensor.record((endNs - startNs) / 1000000, endNs);
-    }
-
     private class MeteredKeyValueIterator<K1, V1> implements KeyValueIterator<K1, V1> {
 
         private final KeyValueIterator<K1, V1> iter;
@@ -264,7 +233,7 @@ public class MeteredKeyValueStore<K, V> implements KeyValueStore<K, V> {
             try {
                 iter.close();
             } finally {
-                recordLatency(this.sensor, this.startNs, time.nanoseconds());
+                metrics.recordLatency(this.sensor, this.startNs, time.nanoseconds());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
index e0962a2..373bba0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/RocksDBKeyValueStore.java
@@ -44,7 +44,7 @@ public class RocksDBKeyValueStore extends MeteredKeyValueStore<byte[], byte[]> {
     }
 
     public RocksDBKeyValueStore(String name, ProcessorContext context, Time time) {
-        super(name, new RocksDBStore(name, context), context, "kafka-streams", time);
+        super(name, new RocksDBStore(name, context), context, "rocksdb-state", time);
     }
 
     private static class RocksDBStore implements KeyValueStore<byte[], byte[]> {
@@ -52,13 +52,13 @@ public class RocksDBKeyValueStore extends MeteredKeyValueStore<byte[], byte[]> {
         private static final int TTL_NOT_USED = -1;
 
         // TODO: these values should be configurable
+        private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
+        private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
         private static final long WRITE_BUFFER_SIZE = 32 * 1024 * 1024L;
         private static final long BLOCK_CACHE_SIZE = 100 * 1024 * 1024L;
         private static final long BLOCK_SIZE = 4096L;
         private static final int TTL_SECONDS = TTL_NOT_USED;
         private static final int MAX_WRITE_BUFFERS = 3;
-        private static final CompressionType COMPRESSION_TYPE = CompressionType.NO_COMPRESSION;
-        private static final CompactionStyle COMPACTION_STYLE = CompactionStyle.UNIVERSAL;
         private static final String DB_FILE_DIR = "rocksdb";
 
         private final String topic;

http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 343ed52..3049d51 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -29,7 +29,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.processor.RestoreFunc;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.OffsetCheckpoint;
 import org.junit.Test;
@@ -82,11 +82,11 @@ public class ProcessorStateManagerTest {
             return persistent;
         }
 
-        public final RestoreFunc restoreFunc = new RestoreFunc() {
+        public final StateRestoreCallback stateRestoreCallback = new StateRestoreCallback() {
             private final Deserializer<Integer> deserializer = new IntegerDeserializer();
 
             @Override
-            public void apply(byte[] key, byte[] value) {
+            public void restore(byte[] key, byte[] value) {
                 keys.add(deserializer.deserialize("", key));
             }
         };
@@ -259,7 +259,7 @@ public class ProcessorStateManagerTest {
 
             ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, new MockRestoreConsumer());
             try {
-                stateMgr.register(mockStateStore, mockStateStore.restoreFunc);
+                stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback);
             } finally {
                 stateMgr.close(Collections.<TopicPartition, Long>emptyMap());
             }
@@ -299,7 +299,7 @@ public class ProcessorStateManagerTest {
                     );
                 }
 
-                stateMgr.register(persistentStore, persistentStore.restoreFunc);
+                stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
 
                 assertEquals(new TopicPartition("persistentStore", 2), restoreConsumer.assignedPartition);
                 assertEquals(lastCheckpointedOffset, restoreConsumer.seekOffset);
@@ -347,7 +347,7 @@ public class ProcessorStateManagerTest {
                     );
                 }
 
-                stateMgr.register(nonPersistentStore, nonPersistentStore.restoreFunc);
+                stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
 
                 assertEquals(new TopicPartition("nonPersistentStore", 2), restoreConsumer.assignedPartition);
                 assertEquals(0L, restoreConsumer.seekOffset);
@@ -375,7 +375,7 @@ public class ProcessorStateManagerTest {
 
             ProcessorStateManager stateMgr = new ProcessorStateManager(1, baseDir, restoreConsumer);
             try {
-                stateMgr.register(mockStateStore, mockStateStore.restoreFunc);
+                stateMgr.register(mockStateStore, mockStateStore.stateRestoreCallback);
 
                 assertNull(stateMgr.getStore("noSuchStore"));
                 assertEquals(mockStateStore, stateMgr.getStore("mockStore"));
@@ -420,10 +420,10 @@ public class ProcessorStateManagerTest {
                 assertFalse(checkpointFile.exists());
 
                 restoreConsumer.reset();
-                stateMgr.register(persistentStore, persistentStore.restoreFunc);
+                stateMgr.register(persistentStore, persistentStore.stateRestoreCallback);
 
                 restoreConsumer.reset();
-                stateMgr.register(nonPersistentStore, nonPersistentStore.restoreFunc);
+                stateMgr.register(nonPersistentStore, nonPersistentStore.stateRestoreCallback);
             } finally {
                 // close the state manager with the ack'ed offsets
                 stateMgr.close(ackedOffsets);

http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 8dcfc40..f93093c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -89,7 +89,7 @@ public class StreamTaskTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testProcessOrder() {
-        StreamTask task = new StreamTask(0, consumer, producer, restoreStateConsumer, partitions, topology, config);
+        StreamTask task = new StreamTask(0, consumer, producer, restoreStateConsumer, partitions, topology, config, null);
 
         task.addRecords(partition1, records(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),
@@ -133,7 +133,7 @@ public class StreamTaskTest {
     @SuppressWarnings("unchecked")
     @Test
     public void testPauseResume() {
-        StreamTask task = new StreamTask(1, consumer, producer, restoreStateConsumer, partitions, topology, config);
+        StreamTask task = new StreamTask(1, consumer, producer, restoreStateConsumer, partitions, topology, config, null);
 
         task.addRecords(partition1, records(
             new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, recordKey, recordValue),

http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 1f3e541..a7e707e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.SystemTime;
@@ -79,7 +80,7 @@ public class StreamThreadTest {
                               Collection<TopicPartition> partitions,
                               ProcessorTopology topology,
                               StreamingConfig config) {
-            super(id, consumer, producer, restoreConsumer, partitions, topology, config);
+            super(id, consumer, producer, restoreConsumer, partitions, topology, config, null);
         }
 
         @Override
@@ -104,7 +105,7 @@ public class StreamThreadTest {
         builder.addSource("source1", "topic1");
         builder.addSource("source2", "topic2");
 
-        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, new SystemTime()) {
+        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", new Metrics(), new SystemTime()) {
             @Override
             protected StreamTask createStreamTask(int id, Collection<TopicPartition> partitionsForTask) {
                 return new TestStreamTask(id, consumer, producer, mockRestoreConsumer, partitionsForTask, builder.build(), config);
@@ -207,7 +208,7 @@ public class StreamThreadTest {
             TopologyBuilder builder = new TopologyBuilder();
             builder.addSource("source1", "topic1");
 
-            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, mockTime) {
+            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", new Metrics(), mockTime) {
                 @Override
                 public void maybeClean() {
                     super.maybeClean();
@@ -325,7 +326,7 @@ public class StreamThreadTest {
             TopologyBuilder builder = new TopologyBuilder();
             builder.addSource("source1", "topic1");
 
-            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, mockTime) {
+            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", new Metrics(), mockTime) {
                 @Override
                 public void maybeCommit() {
                     super.maybeCommit();

http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 3fdfc82..c0b09f6 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -17,10 +17,10 @@
 
 package org.apache.kafka.test;
 
+import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.RestoreFunc;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 
@@ -83,13 +83,13 @@ public class MockProcessorContext implements ProcessorContext {
     }
 
     @Override
-    public Metrics metrics() {
+    public StreamingMetrics metrics() {
         throw new UnsupportedOperationException("metrics() not supported.");
     }
 
     @Override
-    public void register(StateStore store, RestoreFunc func) {
-        if (func != null) throw new UnsupportedOperationException("RestoreFunc not supported.");
+    public void register(StateStore store, StateRestoreCallback func) {
+        if (func != null) throw new UnsupportedOperationException("StateRestoreCallback not supported.");
         storeMap.put(store.name(), store);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/37f7d75e/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 75f8b4c..8eb2c62 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -23,10 +23,12 @@ import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.StreamingConfig;
+import org.apache.kafka.streams.StreamingMetrics;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -153,7 +155,24 @@ public class ProcessorTopologyTestDriver {
             offsetsByTopicPartition.put(tp, new AtomicLong());
         }
 
-        task = new StreamTask(id, consumer, producer, restoreStateConsumer, partitionsByTopic.values(), topology, config);
+        task = new StreamTask(id,
+            consumer,
+            producer,
+            restoreStateConsumer,
+            partitionsByTopic.values(),
+            topology,
+            config,
+            new StreamingMetrics() {
+                @Override
+                public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) {
+                    return null;
+                }
+
+                @Override
+                public void recordLatency(Sensor sensor, long startNs, long endNs) {
+                    // do nothing
+                }
+            });
     }
 
     /**