You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/04/23 18:41:15 UTC

[kafka] branch trunk updated: KAFKA-6376; refactor skip metrics in Kafka Streams

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ed51b2c  KAFKA-6376; refactor skip metrics in Kafka Streams
ed51b2c is described below

commit ed51b2cdf5bdac210a6904bead1a2ca6e8411406
Author: John Roesler <jo...@confluent.io>
AuthorDate: Mon Apr 23 11:41:03 2018 -0700

    KAFKA-6376; refactor skip metrics in Kafka Streams
    
    * unify skipped records metering
    * log warnings when things get skipped
    * tighten up metrics usage a bit
    
    ### Testing strategy:
    Unit testing of the metrics and the logs should be sufficient.
    
    Author: John Roesler <jo...@confluent.io>
    
    Reviewers: Bill Bejeck <bi...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
    
    Closes #4812 from vvcephei/kip-274-streams-skip-metrics
---
 build.gradle                                       |   2 +
 checkstyle/import-control.xml                      |   4 +
 .../kstream/internals/KStreamAggregate.java        |  12 +-
 .../kstream/internals/KStreamKStreamJoin.java      |  11 ++
 .../internals/KStreamKTableJoinProcessor.java      |  14 +-
 .../streams/kstream/internals/KStreamReduce.java   |  11 ++
 .../internals/KStreamSessionWindowAggregate.java   |  11 ++
 .../kstream/internals/KStreamWindowAggregate.java  |  11 ++
 .../kstream/internals/KStreamWindowReduce.java     |  11 ++
 .../kstream/internals/KTableKTableInnerJoin.java   |  11 ++
 .../kstream/internals/KTableKTableLeftJoin.java    |  11 ++
 .../kstream/internals/KTableKTableOuterJoin.java   |  11 ++
 .../kstream/internals/KTableKTableRightJoin.java   |  12 +-
 .../streams/kstream/internals/KTableSource.java    |  11 ++
 .../internals/AbstractProcessorContext.java        |   8 +-
 .../internals/GlobalProcessorContextImpl.java      |   4 +-
 .../processor/internals/GlobalStateUpdateTask.java |  10 +-
 .../processor/internals/GlobalStreamThread.java    |  10 +-
 .../internals/InternalProcessorContext.java        |   4 +
 .../processor/internals/ProcessorContextImpl.java  |   4 +-
 .../streams/processor/internals/ProcessorNode.java |  69 +++++---
 .../processor/internals/RecordCollectorImpl.java   |  15 +-
 .../processor/internals/RecordDeserializer.java    |  17 +-
 .../streams/processor/internals/RecordQueue.java   |  16 +-
 .../streams/processor/internals/SinkNode.java      |   2 +-
 .../streams/processor/internals/SourceNode.java    |   2 +-
 .../processor/internals/StandbyContextImpl.java    |  23 +--
 .../streams/processor/internals/StandbyTask.java   |   3 +-
 .../streams/processor/internals/StreamTask.java    |  97 ++++++------
 .../streams/processor/internals/StreamThread.java  | 175 +++++++++------------
 .../metrics/StreamsMetricsConventions.java}        |  24 ++-
 .../{ => metrics}/StreamsMetricsImpl.java          | 152 +++++++++---------
 .../kafka/streams/state/internals/NamedCache.java  |  26 +--
 .../kstream/internals/KGroupedStreamImplTest.java  |  39 +++++
 .../kstream/internals/KStreamKStreamJoinTest.java  |  34 ++++
 .../kstream/internals/KStreamKTableJoinTest.java   |  23 +++
 ...KStreamSessionWindowAggregateProcessorTest.java |  17 ++
 .../internals/KStreamWindowAggregateTest.java      |  33 ++++
 .../kstream/internals/KStreamWindowReduceTest.java |  62 ++++++++
 .../internals/KTableKTableInnerJoinTest.java       |  27 ++++
 .../internals/KTableKTableLeftJoinTest.java        |  27 ++++
 .../internals/KTableKTableOuterJoinTest.java       |  27 ++++
 .../internals/KTableKTableRightJoinTest.java       |  53 +++++++
 .../kstream/internals/KTableSourceTest.java        |  20 +++
 .../processor/internals/GlobalStateTaskTest.java   |   6 +-
 .../processor/internals/MockStreamsMetrics.java    |   6 +-
 .../processor/internals/PartitionGroupTest.java    |  11 +-
 .../processor/internals/ProcessorNodeTest.java     |  33 ++--
 .../processor/internals/RecordCollectorTest.java   |  73 +++++++--
 .../internals/RecordDeserializerTest.java          |  11 +-
 .../processor/internals/RecordQueueTest.java       |  21 ++-
 .../streams/processor/internals/SinkNodeTest.java  |   4 +-
 .../processor/internals/StreamTaskTest.java        |  42 ++---
 .../processor/internals/StreamThreadTest.java      | 121 +++++++++++---
 .../internals/StreamsMetricsImplTest.java          |  42 ++---
 .../internals/testutil/LogCaptureAppender.java     |  66 ++++++++
 .../streams/state/KeyValueStoreTestDriver.java     |   9 +-
 .../internals/MeteredKeyValueBytesStoreTest.java   |  23 +--
 .../state/internals/MeteredSessionStoreTest.java   |  13 +-
 .../state/internals/MeteredWindowStoreTest.java    | 114 ++++++--------
 .../streams/state/internals/NamedCacheTest.java    |  35 ++---
 .../state/internals/RocksDBWindowStoreTest.java    |   8 +-
 .../state/internals/StoreChangeLoggerTest.java     |   3 +-
 .../StreamThreadStateStoreProviderTest.java        |   3 +-
 .../kafka/test/InternalMockProcessorContext.java   |  80 ++++++----
 .../org/apache/kafka/test/KStreamTestDriver.java   |  10 +-
 .../org/apache/kafka/test/MockProcessorNode.java   |   4 +-
 .../java/org/apache/kafka/test/MockSourceNode.java |   4 +-
 .../kafka/test/ProcessorTopologyTestDriver.java    |   4 +-
 .../org/apache/kafka/test/StreamsTestUtils.java    |  65 +++++++-
 .../apache/kafka/streams/TopologyTestDriver.java   |  30 +++-
 .../streams/processor/MockProcessorContext.java    |   5 +-
 72 files changed, 1440 insertions(+), 572 deletions(-)

diff --git a/build.gradle b/build.gradle
index 8561eb4..51d9345 100644
--- a/build.gradle
+++ b/build.gradle
@@ -932,10 +932,12 @@ project(':streams') {
     testCompile project(':clients').sourceSets.test.output
     testCompile project(':core')
     testCompile project(':core').sourceSets.test.output
+    testCompile libs.log4j
     testCompile libs.junit
     testCompile libs.easymock
     testCompile libs.bcpkix
 
+    testRuntimeOnly project(':streams:test-utils')
     testRuntime libs.slf4jlog4j
   }
 
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 192d735..65f294f 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -249,6 +249,10 @@
         <allow pkg="org.I0Itec.zkclient" />
         <allow pkg="com.fasterxml.jackson" />
         <allow pkg="org.apache.zookeeper" />
+        <allow pkg="org.apache.zookeeper" />
+        <subpackage name="testutil">
+          <allow pkg="org.apache.log4j" />
+        </subpackage>
       </subpackage>
     </subpackage>
   </subpackage>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 1d2d173..1170ff6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -21,10 +21,13 @@ import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, K, V, T> {
-
+    private static final Logger LOG = LoggerFactory.getLogger(KStreamAggregate.class);
     private final String storeName;
     private final Initializer<T> initializer;
     private final Aggregator<? super K, ? super V, T> aggregator;
@@ -52,11 +55,13 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
 
         private KeyValueStore<K, T> store;
         private TupleForwarder<K, T> tupleForwarder;
+        private StreamsMetricsImpl metrics;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
+            metrics = (StreamsMetricsImpl) context.metrics();
             store = (KeyValueStore<K, T>) context.getStateStore(storeName);
             tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
         }
@@ -66,6 +71,11 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
         public void process(final K key, final V value) {
             // If the key or value is null we don't need to proceed
             if (key == null || value == null) {
+                LOG.warn(
+                    "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                    key, value, context().topic(), context().partition(), context().offset()
+                );
+                metrics.skippedRecordsSensor().record();
                 return;
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
index 6af4972..4c6998a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java
@@ -21,11 +21,15 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
+    private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class);
 
     private final String otherWindowName;
     private final long joinBeforeMs;
@@ -50,11 +54,13 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
     private class KStreamKStreamJoinProcessor extends AbstractProcessor<K, V1> {
 
         private WindowStore<K, V2> otherWindow;
+        private StreamsMetricsImpl metrics;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
+            metrics = (StreamsMetricsImpl) context.metrics();
 
             otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName);
         }
@@ -69,6 +75,11 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> {
             // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
             // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
             if (key == null || value == null) {
+                LOG.warn(
+                    "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                    key, value, context().topic(), context().partition(), context().offset()
+                );
+                metrics.skippedRecordsSensor().record();
                 return;
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index 8e8b86a..157c91b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -20,13 +20,18 @@ import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> {
+    private static final Logger LOG = LoggerFactory.getLogger(KStreamKTableJoinProcessor.class);
 
     private final KTableValueGetter<K2, V2> valueGetter;
     private final KeyValueMapper<? super K1, ? super V1, ? extends K2> keyMapper;
     private final ValueJoiner<? super V1, ? super V2, ? extends R> joiner;
     private final boolean leftJoin;
+    private StreamsMetricsImpl metrics;
 
     KStreamKTableJoinProcessor(final KTableValueGetter<K2, V2> valueGetter,
                                final KeyValueMapper<? super K1, ? super V1, ? extends K2> keyMapper,
@@ -41,6 +46,7 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1
     @Override
     public void init(final ProcessorContext context) {
         super.init(context);
+        metrics = (StreamsMetricsImpl) context.metrics();
         valueGetter.init(context);
     }
 
@@ -54,7 +60,13 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1
         // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored
-        if (key != null && value != null) {
+        if (key == null || value == null) {
+            LOG.warn(
+                "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                key, value, context().topic(), context().partition(), context().offset()
+            );
+            metrics.skippedRecordsSensor().record();
+        } else {
             final K2 mappedKey = keyMapper.apply(key, value);
             final V2 value2 = mappedKey == null ? null : valueGetter.get(mappedKey);
             if (leftJoin || value2 != null) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index bd0598a..fff9348 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -20,9 +20,13 @@ import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V, V> {
+    private static final Logger LOG = LoggerFactory.getLogger(KStreamReduce.class);
 
     private final String storeName;
     private final Reducer<V> reducer;
@@ -48,11 +52,13 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
 
         private KeyValueStore<K, V> store;
         private TupleForwarder<K, V> tupleForwarder;
+        private StreamsMetricsImpl metrics;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
+            metrics = (StreamsMetricsImpl) context.metrics();
 
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
             tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
@@ -63,6 +69,11 @@ public class KStreamReduce<K, V> implements KStreamAggProcessorSupplier<K, K, V,
         public void process(final K key, final V value) {
             // If the key or value is null we don't need to proceed
             if (key == null || value == null) {
+                LOG.warn(
+                    "Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                    key, value, context().topic(), context().partition(), context().offset()
+                );
+                metrics.skippedRecordsSensor().record();
                 return;
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index d96468a..114c685 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -26,13 +26,17 @@ import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
 
 class KStreamSessionWindowAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, T> {
+    private static final Logger LOG = LoggerFactory.getLogger(KStreamSessionWindowAggregate.class);
 
     private final String storeName;
     private final SessionWindows windows;
@@ -68,11 +72,13 @@ class KStreamSessionWindowAggregate<K, V, T> implements KStreamAggProcessorSuppl
 
         private SessionStore<K, T> store;
         private TupleForwarder<Windowed<K>, T> tupleForwarder;
+        private StreamsMetricsImpl metrics;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
+            metrics = (StreamsMetricsImpl) context.metrics();
             store = (SessionStore<K, T>) context.getStateStore(storeName);
             tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
         }
@@ -82,6 +88,11 @@ class KStreamSessionWindowAggregate<K, V, T> implements KStreamAggProcessorSuppl
             // if the key is null, we do not need proceed aggregating
             // the record with the table
             if (key == null) {
+                LOG.warn(
+                    "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                    value, context().topic(), context().partition(), context().offset()
+                );
+                metrics.skippedRecordsSensor().record();
                 return;
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index 6a94543..f4bd099 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -24,11 +24,15 @@ import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.WindowStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
 public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, T> {
+    private static final Logger LOG = LoggerFactory.getLogger(KStreamWindowAggregate.class);
 
     private final String storeName;
     private final Windows<W> windows;
@@ -61,11 +65,13 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
 
         private WindowStore<K, T> windowStore;
         private TupleForwarder<Windowed<K>, T> tupleForwarder;
+        private StreamsMetricsImpl metrics;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
+            metrics = (StreamsMetricsImpl) context.metrics();
 
             windowStore = (WindowStore<K, T>) context.getStateStore(storeName);
             tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues), sendOldValues);
@@ -76,6 +82,11 @@ public class KStreamWindowAggregate<K, V, T, W extends Window> implements KStrea
             // if the key is null, we do not need proceed aggregating the record
             // the record with the table
             if (key == null) {
+                LOG.warn(
+                    "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                    value, context().topic(), context().partition(), context().offset()
+                );
+                metrics.skippedRecordsSensor().record();
                 return;
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index 10c3d8a..32dfba3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -23,11 +23,15 @@ import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.WindowStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
 public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, V> {
+    private static final Logger LOG = LoggerFactory.getLogger(KStreamWindowReduce.class);
 
     private final String storeName;
     private final Windows<W> windows;
@@ -57,11 +61,13 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
 
         private WindowStore<K, V> windowStore;
         private TupleForwarder<Windowed<K>, V> tupleForwarder;
+        private StreamsMetricsImpl metrics;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
+            metrics = (StreamsMetricsImpl) context.metrics();
             windowStore = (WindowStore<K, V>) context.getStateStore(storeName);
             tupleForwarder = new TupleForwarder<>(windowStore, context, new ForwardingCacheFlushListener<Windowed<K>, V>(context, sendOldValues), sendOldValues);
         }
@@ -71,6 +77,11 @@ public class KStreamWindowReduce<K, V, W extends Window> implements KStreamAggPr
             // if the key is null, we do not need proceed aggregating
             // the record with the table
             if (key == null) {
+                LOG.warn(
+                    "Skipping record due to null key. value=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                    value, context().topic(), context().partition(), context().offset()
+                );
+                metrics.skippedRecordsSensor().record();
                 return;
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
index 78f3517..5a159ff 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
@@ -21,8 +21,12 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
+    private static final Logger LOG = LoggerFactory.getLogger(KTableKTableInnerJoin.class);
 
     private final KeyValueMapper<K, V1, K> keyValueMapper = new KeyValueMapper<K, V1, K>() {
         @Override
@@ -62,6 +66,7 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
     private class KTableKTableJoinProcessor extends AbstractProcessor<K, Change<V1>> {
 
         private final KTableValueGetter<K, V2> valueGetter;
+        private StreamsMetricsImpl metrics;
 
         KTableKTableJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
             this.valueGetter = valueGetter;
@@ -70,6 +75,7 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
+            metrics = (StreamsMetricsImpl) context.metrics();
             valueGetter.init(context);
         }
 
@@ -77,6 +83,11 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
         public void process(final K key, final Change<V1> change) {
             // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
             if (key == null) {
+                LOG.warn(
+                    "Skipping record due to null key. change=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                    change, context().topic(), context().partition(), context().offset()
+                );
+                metrics.skippedRecordsSensor().record();
                 return;
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index 6b170da..a40ac5d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -20,8 +20,12 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
+    private static final Logger LOG = LoggerFactory.getLogger(KTableKTableLeftJoin.class);
 
     KTableKTableLeftJoin(final KTableImpl<K, ?, V1> table1,
                          final KTableImpl<K, ?, V2> table2,
@@ -55,6 +59,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
     private class KTableKTableLeftJoinProcessor extends AbstractProcessor<K, Change<V1>> {
 
         private final KTableValueGetter<K, V2> valueGetter;
+        private StreamsMetricsImpl metrics;
 
         KTableKTableLeftJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
             this.valueGetter = valueGetter;
@@ -63,6 +68,7 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
+            metrics = (StreamsMetricsImpl) context.metrics();
             valueGetter.init(context);
         }
 
@@ -70,6 +76,11 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
         public void process(final K key, final Change<V1> change) {
             // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
             if (key == null) {
+                LOG.warn(
+                    "Skipping record due to null key. change=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                    change, context().topic(), context().partition(), context().offset()
+                );
+                metrics.skippedRecordsSensor().record();
                 return;
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 97f00e3..377e2d9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -20,8 +20,12 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
+    private static final Logger LOG = LoggerFactory.getLogger(KTableKTableOuterJoin.class);
 
     KTableKTableOuterJoin(final KTableImpl<K, ?, V1> table1,
                           final KTableImpl<K, ?, V2> table2,
@@ -54,6 +58,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
     private class KTableKTableOuterJoinProcessor extends AbstractProcessor<K, Change<V1>> {
 
         private final KTableValueGetter<K, V2> valueGetter;
+        private StreamsMetricsImpl metrics;
 
         KTableKTableOuterJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
             this.valueGetter = valueGetter;
@@ -62,6 +67,7 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
+            metrics = (StreamsMetricsImpl) context.metrics();
             valueGetter.init(context);
         }
 
@@ -69,6 +75,11 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
         public void process(final K key, final Change<V1> change) {
             // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
             if (key == null) {
+                LOG.warn(
+                    "Skipping record due to null key. change=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                    change, context().topic(), context().partition(), context().offset()
+                );
+                metrics.skippedRecordsSensor().record();
                 return;
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index 61798cd..d3916dd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -20,9 +20,12 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, V2> {
-
+    private static final Logger LOG = LoggerFactory.getLogger(KTableKTableRightJoin.class);
 
     KTableKTableRightJoin(final KTableImpl<K, ?, V1> table1,
                           final KTableImpl<K, ?, V2> table2,
@@ -55,6 +58,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
     private class KTableKTableRightJoinProcessor extends AbstractProcessor<K, Change<V1>> {
 
         private final KTableValueGetter<K, V2> valueGetter;
+        private StreamsMetricsImpl metrics;
 
         KTableKTableRightJoinProcessor(final KTableValueGetter<K, V2> valueGetter) {
             this.valueGetter = valueGetter;
@@ -63,6 +67,7 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
+            metrics = (StreamsMetricsImpl) context.metrics();
             valueGetter.init(context);
         }
 
@@ -70,6 +75,11 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R,
         public void process(final K key, final Change<V1> change) {
             // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record
             if (key == null) {
+                LOG.warn(
+                    "Skipping record due to null key. change=[{}] topic=[{}] partition=[{}] offset=[{}]",
+                    change, context().topic(), context().partition(), context().offset()
+                );
+                metrics.skippedRecordsSensor().record();
                 return;
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
index 273e255..85ceaae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java
@@ -20,9 +20,13 @@ import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
+    private static final Logger LOG = LoggerFactory.getLogger(KTableSource.class);
 
     public final String storeName;
 
@@ -45,11 +49,13 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
 
         private KeyValueStore<K, V> store;
         private TupleForwarder<K, V> tupleForwarder;
+        private StreamsMetricsImpl metrics;
 
         @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
             super.init(context);
+            metrics = (StreamsMetricsImpl) context.metrics();
             store = (KeyValueStore<K, V>) context.getStateStore(storeName);
             tupleForwarder = new TupleForwarder<>(store, context, new ForwardingCacheFlushListener<K, V>(context, sendOldValues), sendOldValues);
         }
@@ -58,6 +64,11 @@ public class KTableSource<K, V> implements ProcessorSupplier<K, V> {
         public void process(final K key, final V value) {
             // if the key is null, then ignore the record
             if (key == null) {
+                LOG.warn(
+                    "Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]",
+                    context().topic(), context().partition(), context().offset()
+                );
+                metrics.skippedRecordsSensor().record();
                 return;
             }
             final V oldValue = store.get(key);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 87408c6..fc3067e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -18,10 +18,10 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.File;
@@ -36,7 +36,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
     private final TaskId taskId;
     private final String applicationId;
     private final StreamsConfig config;
-    private final StreamsMetrics metrics;
+    private final StreamsMetricsImpl metrics;
     private final Serde keySerde;
     private final ThreadCache cache;
     private final Serde valueSerde;
@@ -47,7 +47,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
 
     public AbstractProcessorContext(final TaskId taskId,
                                     final StreamsConfig config,
-                                    final StreamsMetrics metrics,
+                                    final StreamsMetricsImpl metrics,
                                     final StateManager stateManager,
                                     final ThreadCache cache) {
         this.taskId = taskId;
@@ -86,7 +86,7 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
     }
 
     @Override
-    public StreamsMetrics metrics() {
+    public StreamsMetricsImpl metrics() {
         return metrics;
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 88d9f56..6bc4121 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -17,13 +17,13 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.util.List;
@@ -33,7 +33,7 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext {
 
     public GlobalProcessorContextImpl(final StreamsConfig config,
                                       final StateManager stateMgr,
-                                      final StreamsMetrics metrics,
+                                      final StreamsMetricsImpl metrics,
                                       final ThreadCache cache) {
         super(new TaskId(-1, -1), config, metrics, stateMgr, cache);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index c455275..26bf493 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -63,7 +63,15 @@ public class GlobalStateUpdateTask implements GlobalStateMaintainer {
         for (final String storeName : storeNames) {
             final String sourceTopic = storeNameToTopic.get(storeName);
             final SourceNode source = topology.source(sourceTopic);
-            deserializers.put(sourceTopic, new RecordDeserializer(source, deserializationExceptionHandler, logContext));
+            deserializers.put(
+                sourceTopic,
+                new RecordDeserializer(
+                    source,
+                    deserializationExceptionHandler,
+                    logContext,
+                    processorContext.metrics().skippedRecordsSensor()
+                )
+            );
         }
         initTopology();
         processorContext.initialized();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index bb8bc1d..af3c7db 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -26,16 +26,15 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -56,7 +55,7 @@ public class GlobalStreamThread extends Thread {
     private final StateDirectory stateDirectory;
     private final Time time;
     private final ThreadCache cache;
-    private final StreamsMetrics streamsMetrics;
+    private final StreamsMetricsImpl streamsMetrics;
     private final ProcessorTopology topology;
     private volatile StreamsException startupException;
 
@@ -186,7 +185,7 @@ public class GlobalStreamThread extends Thread {
         this.topology = topology;
         this.globalConsumer = globalConsumer;
         this.stateDirectory = stateDirectory;
-        this.streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId, Collections.singletonMap("client-id", threadClientId));
+        this.streamsMetrics = new StreamsMetricsImpl(metrics, threadClientId);
         this.logPrefix = String.format("global-stream-thread [%s] ", threadClientId);
         this.logContext = new LogContext(logPrefix);
         this.log = logContext.logger(getClass());
@@ -298,6 +297,9 @@ public class GlobalStreamThread extends Thread {
             } catch (final IOException e) {
                 log.error("Failed to close state maintainer due to the following error:", e);
             }
+
+            streamsMetrics.removeOwnedSensors();
+
             setState(DEAD);
 
             log.info("Shutdown complete");
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 25df826..0ebaf60 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 /**
@@ -26,6 +27,9 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
  */
 public interface InternalProcessorContext extends ProcessorContext {
 
+    @Override
+    StreamsMetricsImpl metrics();
+
     /**
      * Returns the current {@link RecordContext}
      * @return the current {@link RecordContext}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 44a25c1..178937f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
@@ -25,6 +24,7 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.util.List;
@@ -41,7 +41,7 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re
                          final StreamsConfig config,
                          final RecordCollector collector,
                          final ProcessorStateManager stateMgr,
-                         final StreamsMetrics metrics,
+                         final StreamsMetricsImpl metrics,
                          final ThreadCache cache) {
         super(id, config, metrics, stateMgr, cache);
         this.task = task;
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 0411a37..0854b67 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -19,11 +19,11 @@ package org.apache.kafka.streams.processor.internals;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -106,11 +106,11 @@ public class ProcessorNode<K, V> {
         childByName.put(child.name, child);
     }
 
-    public void init(final ProcessorContext context) {
+    public void init(final InternalProcessorContext context) {
         this.context = context;
         try {
             nodeMetrics = new NodeMetrics(context.metrics(), name, context);
-            nodeMetrics.metrics.measureLatencyNs(time, initDelegate, nodeMetrics.nodeCreationSensor);
+            runAndMeasureLatency(time, initDelegate, nodeMetrics.nodeCreationSensor);
         } catch (final Exception e) {
             throw new StreamsException(String.format("failed to initialize processor %s", name), e);
         }
@@ -118,7 +118,7 @@ public class ProcessorNode<K, V> {
 
     public void close() {
         try {
-            nodeMetrics.metrics.measureLatencyNs(time, closeDelegate, nodeMetrics.nodeDestructionSensor);
+            runAndMeasureLatency(time, closeDelegate, nodeMetrics.nodeDestructionSensor);
             nodeMetrics.removeAllSensors();
         } catch (final Exception e) {
             throw new StreamsException(String.format("failed to close processor %s", name), e);
@@ -130,7 +130,7 @@ public class ProcessorNode<K, V> {
         this.key = key;
         this.value = value;
 
-        this.nodeMetrics.metrics.measureLatencyNs(time, processDelegate, nodeMetrics.nodeProcessTimeSensor);
+        runAndMeasureLatency(time, processDelegate, nodeMetrics.nodeProcessTimeSensor);
     }
 
     public void punctuate(final long timestamp, final Punctuator punctuator) {
@@ -140,7 +140,7 @@ public class ProcessorNode<K, V> {
                 punctuator.punctuate(timestamp);
             }
         };
-        this.nodeMetrics.metrics.measureLatencyNs(time, punctuateDelegate, nodeMetrics.nodePunctuateTimeSensor);
+        runAndMeasureLatency(time, punctuateDelegate, nodeMetrics.nodePunctuateTimeSensor);
     }
 
     /**
@@ -172,44 +172,53 @@ public class ProcessorNode<K, V> {
         return nodeMetrics.sourceNodeForwardSensor;
     }
 
-    Sensor sourceNodeSkippedDueToDeserializationErrorSensor() {
-        return nodeMetrics.sourceNodeSkippedDueToDeserializationError;
-    }
-
     private static final class NodeMetrics {
         private final StreamsMetricsImpl metrics;
 
         private final Sensor nodeProcessTimeSensor;
         private final Sensor nodePunctuateTimeSensor;
         private final Sensor sourceNodeForwardSensor;
-        private final Sensor sourceNodeSkippedDueToDeserializationError;
         private final Sensor nodeCreationSensor;
         private final Sensor nodeDestructionSensor;
 
-        private NodeMetrics(final StreamsMetrics metrics, final String name, final ProcessorContext context) {
-            final String scope = "processor-node";
-            final String tagKey = "task-id";
-            final String tagValue = context.taskId().toString();
-            this.metrics = (StreamsMetricsImpl) metrics;
+        private NodeMetrics(final StreamsMetricsImpl metrics, final String processorNodeName, final ProcessorContext context) {
+            this.metrics = metrics;
 
             // these are all latency metrics
             this.nodeProcessTimeSensor = metrics.addLatencyAndThroughputSensor(
-                scope, name, "process", Sensor.RecordingLevel.DEBUG, tagKey, tagValue
+                "processor-node",
+                processorNodeName,
+                "process",
+                Sensor.RecordingLevel.DEBUG,
+                "task-id", context.taskId().toString()
             );
             this.nodePunctuateTimeSensor = metrics.addLatencyAndThroughputSensor(
-                scope, name, "punctuate", Sensor.RecordingLevel.DEBUG, tagKey, tagValue
+                "processor-node",
+                processorNodeName,
+                "punctuate",
+                Sensor.RecordingLevel.DEBUG,
+                "task-id", context.taskId().toString()
             );
             this.nodeCreationSensor = metrics.addLatencyAndThroughputSensor(
-                scope, name, "create", Sensor.RecordingLevel.DEBUG, tagKey, tagValue
+                "processor-node",
+                processorNodeName,
+                "create",
+                Sensor.RecordingLevel.DEBUG,
+                "task-id", context.taskId().toString()
             );
             this.nodeDestructionSensor = metrics.addLatencyAndThroughputSensor(
-                scope, name, "destroy", Sensor.RecordingLevel.DEBUG, tagKey, tagValue
+                "processor-node",
+                processorNodeName,
+                "destroy",
+                Sensor.RecordingLevel.DEBUG,
+                "task-id", context.taskId().toString()
             );
             this.sourceNodeForwardSensor = metrics.addThroughputSensor(
-                scope, name, "forward", Sensor.RecordingLevel.DEBUG, tagKey, tagValue
-            );
-            this.sourceNodeSkippedDueToDeserializationError = metrics.addThroughputSensor(
-                scope, name, "skippedDueToDeserializationError", Sensor.RecordingLevel.DEBUG, tagKey, tagValue
+                "processor-node",
+                processorNodeName,
+                "forward",
+                Sensor.RecordingLevel.DEBUG,
+                "task-id", context.taskId().toString()
             );
         }
 
@@ -219,7 +228,17 @@ public class ProcessorNode<K, V> {
             metrics.removeSensor(sourceNodeForwardSensor);
             metrics.removeSensor(nodeCreationSensor);
             metrics.removeSensor(nodeDestructionSensor);
-            metrics.removeSensor(sourceNodeSkippedDueToDeserializationError);
+        }
+    }
+
+    private static void runAndMeasureLatency(final Time time, final Runnable action, final Sensor sensor) {
+        long startNs = -1;
+        if (sensor.shouldRecord()) {
+            startNs = time.nanoseconds();
+        }
+        action.run();
+        if (startNs != -1) {
+            sensor.record(time.nanoseconds() - startNs);
         }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 0d13758..8167539 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.ProductionExceptionHandler;
@@ -51,24 +52,25 @@ public class RecordCollectorImpl implements RecordCollector {
     private final Map<TopicPartition, Long> offsets;
     private final String logPrefix;
     private final ProductionExceptionHandler productionExceptionHandler;
+    private final Sensor skippedRecordsSensor;
 
     private final static String LOG_MESSAGE = "Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " +
         "No more records will be sent and no more offsets will be recorded for this task.";
     private final static String EXCEPTION_MESSAGE = "%sAbort sending since %s with a previous record (key %s value %s timestamp %d) to topic %s due to %s";
     private final static String PARAMETER_HINT = "\nYou can increase producer parameter `retries` and `retry.backoff.ms` to avoid this error.";
-    private final static String HANDLER_CONTINUED_MESSAGE = "Error sending records (key {} value {} timestamp {}) to topic {} due to {}; " +
-        "The exception handler chose to CONTINUE processing in spite of this error.";
     private volatile KafkaException sendException;
 
     public RecordCollectorImpl(final Producer<byte[], byte[]> producer,
                                final String streamTaskId,
                                final LogContext logContext,
-                               final ProductionExceptionHandler productionExceptionHandler) {
+                               final ProductionExceptionHandler productionExceptionHandler,
+                               final Sensor skippedRecordsSensor) {
         this.producer = producer;
         this.offsets = new HashMap<>();
         this.logPrefix = String.format("task [%s] ", streamTaskId);
         this.log = logContext.logger(getClass());
         this.productionExceptionHandler = productionExceptionHandler;
+        this.skippedRecordsSensor = skippedRecordsSensor;
     }
 
     @Override
@@ -183,7 +185,12 @@ public class RecordCollectorImpl implements RecordCollector {
                                 } else if (productionExceptionHandler.handle(serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) {
                                     recordSendError(key, value, timestamp, topic, exception);
                                 } else {
-                                    log.debug(HANDLER_CONTINUED_MESSAGE, key, value, timestamp, topic, exception);
+                                    log.warn(
+                                        "Error sending records (key=[{}] value=[{}] timestamp=[{}]) to topic=[{}] and partition=[{}]; " +
+                                            "The exception handler chose to CONTINUE processing in spite of this error.",
+                                        key, value, timestamp, topic, partition, exception
+                                    );
+                                    skippedRecordsSensor.record();
                                 }
                             }
                         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
index 47591d1..36e2c9a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
@@ -30,13 +31,16 @@ class RecordDeserializer {
     private final SourceNode sourceNode;
     private final DeserializationExceptionHandler deserializationExceptionHandler;
     private final Logger log;
+    private final Sensor skippedRecordSensor;
 
     RecordDeserializer(final SourceNode sourceNode,
                        final DeserializationExceptionHandler deserializationExceptionHandler,
-                       final LogContext logContext) {
+                       final LogContext logContext,
+                       final Sensor skippedRecordsSensor) {
         this.sourceNode = sourceNode;
         this.deserializationExceptionHandler = deserializationExceptionHandler;
         this.log = logContext.logger(RecordDeserializer.class);
+        this.skippedRecordSensor = skippedRecordsSensor;
     }
 
     /**
@@ -79,10 +83,17 @@ class RecordDeserializer {
                     DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately.",
                     deserializationException);
             } else {
-                sourceNode.sourceNodeSkippedDueToDeserializationErrorSensor().record();
+                log.warn(
+                    "Skipping record due to deserialization error. topic=[{}] partition=[{}] offset=[{}]",
+                    rawRecord.topic(),
+                    rawRecord.partition(),
+                    rawRecord.offset(),
+                    deserializationException
+                );
+                skippedRecordSensor.record();
+                return null;
             }
         }
-        return null;
     }
 
     SourceNode sourceNode() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
index 85f3b72..22ef4d6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.slf4j.Logger;
 
 import java.util.ArrayDeque;
@@ -49,14 +50,19 @@ public class RecordQueue {
                 final SourceNode source,
                 final TimestampExtractor timestampExtractor,
                 final DeserializationExceptionHandler deserializationExceptionHandler,
-                final ProcessorContext processorContext,
+                final InternalProcessorContext processorContext,
                 final LogContext logContext) {
         this.partition = partition;
         this.source = source;
         this.timestampExtractor = timestampExtractor;
         this.fifoQueue = new ArrayDeque<>();
         this.timeTracker = new MinTimestampTracker<>();
-        this.recordDeserializer = new RecordDeserializer(source, deserializationExceptionHandler, logContext);
+        this.recordDeserializer = new RecordDeserializer(
+            source,
+            deserializationExceptionHandler,
+            logContext,
+            processorContext.metrics().skippedRecordsSensor()
+        );
         this.processorContext = processorContext;
         this.log = logContext.logger(RecordQueue.class);
     }
@@ -90,6 +96,7 @@ public class RecordQueue {
 
             final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(processorContext, rawRecord);
             if (record == null) {
+                // this only happens if the deserializer decides to skip. It has already logged the reason.
                 continue;
             }
 
@@ -107,6 +114,11 @@ public class RecordQueue {
 
             // drop message if TS is invalid, i.e., negative
             if (timestamp < 0) {
+                log.warn(
+                    "Skipping record due to negative extracted timestamp. topic=[{}] partition=[{}] offset=[{}] extractedTimestamp=[{}] extractor=[{}]",
+                    record.topic(), record.partition(), record.offset(), timestamp, timestampExtractor.getClass().getCanonicalName()
+                );
+                ((StreamsMetricsImpl) processorContext.metrics()).skippedRecordsSensor().record();
                 continue;
             }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 00e5dca..0fbd6dc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -54,7 +54,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
 
     @SuppressWarnings("unchecked")
     @Override
-    public void init(final ProcessorContext context) {
+    public void init(final InternalProcessorContext context) {
         super.init(context);
         this.context = context;
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 8df3998..4aaba9e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -65,7 +65,7 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
 
     @SuppressWarnings("unchecked")
     @Override
-    public void init(final ProcessorContext context) {
+    public void init(final InternalProcessorContext context) {
         super.init(context);
         this.context = context;
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 360c4ab..ef4585a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -20,7 +20,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
@@ -28,6 +27,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.util.Collections;
@@ -70,15 +70,18 @@ class StandbyContextImpl extends AbstractProcessorContext implements RecordColle
     StandbyContextImpl(final TaskId id,
                        final StreamsConfig config,
                        final ProcessorStateManager stateMgr,
-                       final StreamsMetrics metrics) {
-        super(id,
-              config,
-              metrics,
-              stateMgr,
-              new ThreadCache(
-                  new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())),
-                  0,
-                  metrics));
+                       final StreamsMetricsImpl metrics) {
+        super(
+            id,
+            config,
+            metrics,
+            stateMgr,
+            new ThreadCache(
+                new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())),
+                0,
+                metrics
+            )
+        );
     }
 
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index a048407..ddf84fd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -53,7 +54,7 @@ public class StandbyTask extends AbstractTask {
                 final Consumer<byte[], byte[]> consumer,
                 final ChangelogReader changelogReader,
                 final StreamsConfig config,
-                final StreamsMetrics metrics,
+                final StreamsMetricsImpl metrics,
                 final StateDirectory stateDirectory) {
         super(id, partitions, topology, consumer, changelogReader, true, stateDirectory, config);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index f7816d2..d9515b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -25,10 +25,8 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
 import org.apache.kafka.streams.errors.ProductionExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -38,6 +36,7 @@ import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.util.Collection;
@@ -68,16 +67,16 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     private boolean commitOffsetNeeded = false;
     private boolean transactionInFlight = false;
     private final Time time;
-    private final TaskMetrics metrics;
+    private final TaskMetrics taskMetrics;
 
-    protected static class TaskMetrics {
+    protected static final class TaskMetrics {
         final StreamsMetricsImpl metrics;
         final Sensor taskCommitTimeSensor;
 
 
-        TaskMetrics(final TaskId id, final StreamsMetrics metrics) {
+        TaskMetrics(final TaskId id, final StreamsMetricsImpl metrics) {
             final String name = id.toString();
-            this.metrics = (StreamsMetricsImpl) metrics;
+            this.metrics = metrics;
             taskCommitTimeSensor = metrics.addLatencyAndThroughputSensor("task", name, "commit", Sensor.RecordingLevel.DEBUG);
         }
 
@@ -86,41 +85,51 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         }
     }
 
-    /**
-     * Create {@link StreamTask} with its assigned partitions
-     *
-     * @param id              the ID of this task
-     * @param partitions      the collection of assigned {@link TopicPartition}
-     * @param topology        the instance of {@link ProcessorTopology}
-     * @param consumer        the instance of {@link Consumer}
-     * @param changelogReader the instance of {@link ChangelogReader} used for restoring state
-     * @param config          the {@link StreamsConfig} specified by the user
-     * @param metrics         the {@link StreamsMetrics} created by the thread
-     * @param stateDirectory  the {@link StateDirectory} created by the thread
-     * @param cache           the {@link ThreadCache} created by the thread
-     * @param time            the system {@link Time} of the thread
-     * @param producer        the instance of {@link Producer} used to produce records
-     */
     public StreamTask(final TaskId id,
                       final Collection<TopicPartition> partitions,
                       final ProcessorTopology topology,
                       final Consumer<byte[], byte[]> consumer,
                       final ChangelogReader changelogReader,
                       final StreamsConfig config,
-                      final StreamsMetrics metrics,
+                      final StreamsMetricsImpl metrics,
                       final StateDirectory stateDirectory,
                       final ThreadCache cache,
                       final Time time,
                       final Producer<byte[], byte[]> producer) {
+        this(id, partitions, topology, consumer, changelogReader, config, metrics, stateDirectory, cache, time, producer, null);
+    }
+
+    public StreamTask(final TaskId id,
+                      final Collection<TopicPartition> partitions,
+                      final ProcessorTopology topology,
+                      final Consumer<byte[], byte[]> consumer,
+                      final ChangelogReader changelogReader,
+                      final StreamsConfig config,
+                      final StreamsMetricsImpl metrics,
+                      final StateDirectory stateDirectory,
+                      final ThreadCache cache,
+                      final Time time,
+                      final Producer<byte[], byte[]> producer,
+                      final RecordCollector recordCollector) {
         super(id, partitions, topology, consumer, changelogReader, false, stateDirectory, config);
 
         this.time = time;
         this.producer = producer;
-        this.metrics = new TaskMetrics(id, metrics);
+        this.taskMetrics = new TaskMetrics(id, metrics);
 
         final ProductionExceptionHandler productionExceptionHandler = config.defaultProductionExceptionHandler();
 
-        recordCollector = createRecordCollector(logContext, productionExceptionHandler);
+        if (recordCollector == null) {
+            this.recordCollector = new RecordCollectorImpl(
+                producer,
+                id.toString(),
+                logContext,
+                productionExceptionHandler,
+                metrics.skippedRecordsSensor()
+            );
+        } else {
+            this.recordCollector = recordCollector;
+        }
         streamTimePunctuationQueue = new PunctuationQueue();
         systemTimePunctuationQueue = new PunctuationQueue();
         maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
@@ -133,7 +142,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         final Map<TopicPartition, RecordQueue> partitionQueues = new HashMap<>();
 
         // initialize the topology with its own context
-        processorContext = new ProcessorContextImpl(id, this, config, recordCollector, stateMgr, metrics, cache);
+        processorContext = new ProcessorContextImpl(id, this, config, this.recordCollector, stateMgr, metrics, cache);
 
         final TimestampExtractor defaultTimestampExtractor = config.defaultTimestampExtractor();
         final DeserializationExceptionHandler defaultDeserializationExceptionHandler = config.defaultDeserializationExceptionHandler();
@@ -312,22 +321,20 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      */
     // visible for testing
     void commit(final boolean startNewTransaction) {
+        final long startNs = time.nanoseconds();
         log.debug("Committing");
-        metrics.metrics.measureLatencyNs(
-            time,
-            new Runnable() {
-                @Override
-                public void run() {
-                    flushState();
-                    if (!eosEnabled) {
-                        stateMgr.checkpoint(recordCollectorOffsets());
-                    }
-                    commitOffsets(startNewTransaction);
-                }
-            },
-            metrics.taskCommitTimeSensor);
+
+        flushState();
+
+        if (!eosEnabled) {
+            stateMgr.checkpoint(recordCollectorOffsets());
+        }
+
+        commitOffsets(startNewTransaction);
 
         commitRequested = false;
+
+        taskMetrics.taskCommitTimeSensor.record(time.nanoseconds() - startNs);
     }
 
     @Override
@@ -489,7 +496,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
 
         try {
             partitionGroup.close();
-            metrics.removeAllSensors();
+            taskMetrics.removeAllSensors();
         } finally {
             if (eosEnabled) {
                 if (!clean) {
@@ -567,11 +574,9 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
      *
      * @param partition the partition
      * @param records   the records
-     * @return the number of added records
      */
     @SuppressWarnings("unchecked")
-    public int addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
-        final int oldQueueSize = partitionGroup.numBuffered(partition);
+    public void addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
         final int newQueueSize = partitionGroup.addRawRecords(partition, records);
 
         if (log.isTraceEnabled()) {
@@ -583,8 +588,6 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
         if (newQueueSize > maxBufferedSize) {
             consumer.pause(singleton(partition));
         }
-
-        return newQueueSize - oldQueueSize;
     }
 
     /**
@@ -692,10 +695,4 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     RecordCollector recordCollector() {
         return recordCollector;
     }
-
-    // visible for testing only
-    RecordCollector createRecordCollector(final LogContext logContext,
-                                          final ProductionExceptionHandler productionExceptionHandler) {
-        return new RecordCollectorImpl(producer, id.toString(), logContext, productionExceptionHandler);
-    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 32cafb4..39727be 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -26,27 +26,25 @@ import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Meter;
-import org.apache.kafka.common.metrics.stats.SampledStat;
-import org.apache.kafka.common.metrics.stats.Sum;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KafkaClientSupplier;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.slf4j.Logger;
 
@@ -54,15 +52,19 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Collections.singleton;
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsConventions.threadLevelSensorName;
 
 public class StreamThread extends Thread {
 
@@ -322,9 +324,8 @@ public class StreamThread extends Thread {
         final String applicationId;
         final InternalTopologyBuilder builder;
         final StreamsConfig config;
-        final StreamsMetrics streamsMetrics;
+        final StreamsMetricsThreadImpl streamsMetrics;
         final StateDirectory stateDirectory;
-        final Sensor taskCreatedSensor;
         final ChangelogReader storeChangelogReader;
         final Time time;
         final Logger log;
@@ -332,9 +333,8 @@ public class StreamThread extends Thread {
 
         AbstractTaskCreator(final InternalTopologyBuilder builder,
                             final StreamsConfig config,
-                            final StreamsMetrics streamsMetrics,
+                            final StreamsMetricsThreadImpl streamsMetrics,
                             final StateDirectory stateDirectory,
-                            final Sensor taskCreatedSensor,
                             final ChangelogReader storeChangelogReader,
                             final Time time,
                             final Logger log) {
@@ -343,7 +343,6 @@ public class StreamThread extends Thread {
             this.config = config;
             this.streamsMetrics = streamsMetrics;
             this.stateDirectory = stateDirectory;
-            this.taskCreatedSensor = taskCreatedSensor;
             this.storeChangelogReader = storeChangelogReader;
             this.time = time;
             this.log = log;
@@ -386,9 +385,8 @@ public class StreamThread extends Thread {
 
         TaskCreator(final InternalTopologyBuilder builder,
                     final StreamsConfig config,
-                    final StreamsMetrics streamsMetrics,
+                    final StreamsMetricsThreadImpl streamsMetrics,
                     final StateDirectory stateDirectory,
-                    final Sensor taskCreatedSensor,
                     final ChangelogReader storeChangelogReader,
                     final ThreadCache cache,
                     final Time time,
@@ -401,7 +399,6 @@ public class StreamThread extends Thread {
                 config,
                 streamsMetrics,
                 stateDirectory,
-                taskCreatedSensor,
                 storeChangelogReader,
                 time,
                 log);
@@ -415,7 +412,7 @@ public class StreamThread extends Thread {
         StreamTask createTask(final Consumer<byte[], byte[]> consumer,
                               final TaskId taskId,
                               final Set<TopicPartition> partitions) {
-            taskCreatedSensor.record();
+            streamsMetrics.taskCreatedSensor.record();
 
             return new StreamTask(
                 taskId,
@@ -428,8 +425,8 @@ public class StreamThread extends Thread {
                 stateDirectory,
                 cache,
                 time,
-                createProducer(taskId));
-
+                createProducer(taskId)
+            );
         }
 
         private Producer<byte[], byte[]> createProducer(final TaskId id) {
@@ -459,9 +456,8 @@ public class StreamThread extends Thread {
     static class StandbyTaskCreator extends AbstractTaskCreator<StandbyTask> {
         StandbyTaskCreator(final InternalTopologyBuilder builder,
                            final StreamsConfig config,
-                           final StreamsMetrics streamsMetrics,
+                           final StreamsMetricsThreadImpl streamsMetrics,
                            final StateDirectory stateDirectory,
-                           final Sensor taskCreatedSensor,
                            final ChangelogReader storeChangelogReader,
                            final Time time,
                            final Logger log) {
@@ -470,7 +466,6 @@ public class StreamThread extends Thread {
                 config,
                 streamsMetrics,
                 stateDirectory,
-                taskCreatedSensor,
                 storeChangelogReader,
                 time,
                 log);
@@ -480,7 +475,7 @@ public class StreamThread extends Thread {
         StandbyTask createTask(final Consumer<byte[], byte[]> consumer,
                                final TaskId taskId,
                                final Set<TopicPartition> partitions) {
-            taskCreatedSensor.record();
+            streamsMetrics.taskCreatedSensor.record();
 
             final ProcessorTopology topology = builder.build(taskId.topicGroupId);
 
@@ -505,80 +500,67 @@ public class StreamThread extends Thread {
         }
     }
 
-    /**
-     * This class extends {@link StreamsMetricsImpl(Metrics, String, String, Map)} and
-     * overrides one of its functions for efficiency
-     */
     static class StreamsMetricsThreadImpl extends StreamsMetricsImpl {
-        final Sensor commitTimeSensor;
-        final Sensor pollTimeSensor;
-        final Sensor processTimeSensor;
-        final Sensor punctuateTimeSensor;
-        final Sensor taskCreatedSensor;
-        final Sensor tasksClosedSensor;
-        final Sensor skippedRecordsSensor;
-
-        StreamsMetricsThreadImpl(final Metrics metrics, final String groupName, final String prefix, final Map<String, String> tags) {
-            super(metrics, groupName, tags);
-            commitTimeSensor = metrics.sensor(prefix + ".commit-latency", Sensor.RecordingLevel.INFO);
-            commitTimeSensor.add(metrics.metricName("commit-latency-avg", this.groupName, "The average commit time in ms", this.tags), new Avg());
-            commitTimeSensor.add(metrics.metricName("commit-latency-max", this.groupName, "The maximum commit time in ms", this.tags), new Max());
-            commitTimeSensor.add(createMeter(metrics, new Count(), "commit", "commit calls"));
-
-            pollTimeSensor = metrics.sensor(prefix + ".poll-latency", Sensor.RecordingLevel.INFO);
-            pollTimeSensor.add(metrics.metricName("poll-latency-avg", this.groupName, "The average poll time in ms", this.tags), new Avg());
-            pollTimeSensor.add(metrics.metricName("poll-latency-max", this.groupName, "The maximum poll time in ms", this.tags), new Max());
-            pollTimeSensor.add(createMeter(metrics, new Count(), "poll", "record-poll calls"));
-
-            processTimeSensor = metrics.sensor(prefix + ".process-latency", Sensor.RecordingLevel.INFO);
-            processTimeSensor.add(metrics.metricName("process-latency-avg", this.groupName, "The average process time in ms", this.tags), new Avg());
-            processTimeSensor.add(metrics.metricName("process-latency-max", this.groupName, "The maximum process time in ms", this.tags), new Max());
-            processTimeSensor.add(createMeter(metrics, new Count(), "process", "process calls"));
-
-            punctuateTimeSensor = metrics.sensor(prefix + ".punctuate-latency", Sensor.RecordingLevel.INFO);
-            punctuateTimeSensor.add(metrics.metricName("punctuate-latency-avg", this.groupName, "The average punctuate time in ms", this.tags), new Avg());
-            punctuateTimeSensor.add(metrics.metricName("punctuate-latency-max", this.groupName, "The maximum punctuate time in ms", this.tags), new Max());
-            punctuateTimeSensor.add(createMeter(metrics, new Count(), "punctuate", "punctuate calls"));
-
-            taskCreatedSensor = metrics.sensor(prefix + ".task-created", Sensor.RecordingLevel.INFO);
-            taskCreatedSensor.add(createMeter(metrics, new Count(), "task-created", "newly created tasks"));
-
-            tasksClosedSensor = metrics.sensor(prefix + ".task-closed", Sensor.RecordingLevel.INFO);
-            tasksClosedSensor.add(createMeter(metrics, new Count(), "task-closed", "closed tasks"));
-
-            skippedRecordsSensor = metrics.sensor(prefix + ".skipped-records");
-            skippedRecordsSensor.add(createMeter(metrics, new Sum(), "skipped-records", "skipped records"));
 
+        private final Sensor commitTimeSensor;
+        private final Sensor pollTimeSensor;
+        private final Sensor processTimeSensor;
+        private final Sensor punctuateTimeSensor;
+        private final Sensor taskCreatedSensor;
+        private final Sensor tasksClosedSensor;
+
+        private final Deque<String> ownedSensors = new LinkedList<>();
+
+        StreamsMetricsThreadImpl(final Metrics metrics, final String threadName) {
+            super(metrics, threadName);
+            final String groupName = "stream-metrics";
+
+            commitTimeSensor = metrics.sensor(threadLevelSensorName(threadName, "commit-latency"), Sensor.RecordingLevel.INFO);
+            commitTimeSensor.add(metrics.metricName("commit-latency-avg", groupName, "The average commit time in ms", tags()), new Avg());
+            commitTimeSensor.add(metrics.metricName("commit-latency-max", groupName, "The maximum commit time in ms", tags()), new Max());
+            commitTimeSensor.add(metrics.metricName("commit-rate", groupName, "The average per-second number of commit calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
+            commitTimeSensor.add(metrics.metricName("commit-total", groupName, "The total number of commit calls", tags()), new Count());
+            ownedSensors.push(commitTimeSensor.name());
+
+            pollTimeSensor = metrics.sensor(threadLevelSensorName(threadName, "poll-latency"), Sensor.RecordingLevel.INFO);
+            pollTimeSensor.add(metrics.metricName("poll-latency-avg", groupName, "The average poll time in ms", tags()), new Avg());
+            pollTimeSensor.add(metrics.metricName("poll-latency-max", groupName, "The maximum poll time in ms", tags()), new Max());
+            pollTimeSensor.add(metrics.metricName("poll-rate", groupName, "The average per-second number of record-poll calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
+            pollTimeSensor.add(metrics.metricName("poll-total", groupName, "The total number of record-poll calls", tags()), new Count());
+            ownedSensors.push(pollTimeSensor.name());
+
+            processTimeSensor = metrics.sensor(threadLevelSensorName(threadName, "process-latency"), Sensor.RecordingLevel.INFO);
+            processTimeSensor.add(metrics.metricName("process-latency-avg", groupName, "The average process time in ms", tags()), new Avg());
+            processTimeSensor.add(metrics.metricName("process-latency-max", groupName, "The maximum process time in ms", tags()), new Max());
+            processTimeSensor.add(metrics.metricName("process-rate", groupName, "The average per-second number of process calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
+            processTimeSensor.add(metrics.metricName("process-total", groupName, "The total number of process calls", tags()), new Count());
+            ownedSensors.push(processTimeSensor.name());
+
+            punctuateTimeSensor = metrics.sensor(threadLevelSensorName(threadName, "punctuate-latency"), Sensor.RecordingLevel.INFO);
+            punctuateTimeSensor.add(metrics.metricName("punctuate-latency-avg", groupName, "The average punctuate time in ms", tags()), new Avg());
+            punctuateTimeSensor.add(metrics.metricName("punctuate-latency-max", groupName, "The maximum punctuate time in ms", tags()), new Max());
+            punctuateTimeSensor.add(metrics.metricName("punctuate-rate", groupName, "The average per-second number of punctuate calls", tags()), new Rate(TimeUnit.SECONDS, new Count()));
+            punctuateTimeSensor.add(metrics.metricName("punctuate-total", groupName, "The total number of punctuate calls", tags()), new Count());
+            ownedSensors.push(punctuateTimeSensor.name());
+
+            taskCreatedSensor = metrics.sensor(threadLevelSensorName(threadName, "task-created"), Sensor.RecordingLevel.INFO);
+            taskCreatedSensor.add(metrics.metricName("task-created-rate", "stream-metrics", "The average per-second number of newly created tasks", tags()), new Rate(TimeUnit.SECONDS, new Count()));
+            taskCreatedSensor.add(metrics.metricName("task-created-total", "stream-metrics", "The total number of newly created tasks", tags()), new Total());
+            ownedSensors.push(taskCreatedSensor.name());
+
+            tasksClosedSensor = metrics.sensor(threadLevelSensorName(threadName, "task-closed"), Sensor.RecordingLevel.INFO);
+            tasksClosedSensor.add(metrics.metricName("task-closed-rate", groupName, "The average per-second number of closed tasks", tags()), new Rate(TimeUnit.SECONDS, new Count()));
+            tasksClosedSensor.add(metrics.metricName("task-closed-total", groupName, "The total number of closed tasks", tags()), new Total());
+            ownedSensors.push(tasksClosedSensor.name());
         }
 
-        private Meter createMeter(final Metrics metrics,
-                                  final SampledStat stat,
-                                  final String baseName,
-                                  final String descriptiveName) {
-            final MetricName rateMetricName = metrics.metricName(
-                baseName + "-rate",
-                groupName,
-                String.format("The average per-second number of %s", descriptiveName),
-                tags
-            );
-            final MetricName totalMetricName = metrics.metricName(
-                baseName + "-total",
-                groupName,
-                String.format("The total number of %s", descriptiveName),
-                tags
-            );
-            return new Meter(stat, rateMetricName, totalMetricName);
-        }
-
-        void removeAllSensors() {
-            removeSensor(commitTimeSensor);
-            removeSensor(pollTimeSensor);
-            removeSensor(processTimeSensor);
-            removeSensor(punctuateTimeSensor);
-            removeSensor(taskCreatedSensor);
-            removeSensor(tasksClosedSensor);
-            removeSensor(skippedRecordsSensor);
-
+        public void removeOwnedSensors() {
+            synchronized (ownedSensors) {
+                super.removeOwnedSensors();
+                while (!ownedSensors.isEmpty()) {
+                    registry().removeSensor(ownedSensors.pop());
+                }
+            }
         }
     }
 
@@ -640,9 +622,8 @@ public class StreamThread extends Thread {
 
         final StreamsMetricsThreadImpl streamsMetrics = new StreamsMetricsThreadImpl(
             metrics,
-            "stream-metrics",
-            "thread." + threadClientId,
-            Collections.singletonMap("client-id", threadClientId));
+            threadClientId
+        );
 
         final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
 
@@ -651,7 +632,6 @@ public class StreamThread extends Thread {
             config,
             streamsMetrics,
             stateDirectory,
-            streamsMetrics.taskCreatedSensor,
             changelogReader,
             cache,
             time,
@@ -664,7 +644,6 @@ public class StreamThread extends Thread {
             config,
             streamsMetrics,
             stateDirectory,
-            streamsMetrics.taskCreatedSensor,
             changelogReader,
             time,
             log);
@@ -930,7 +909,6 @@ public class StreamThread extends Thread {
      * @param records Records, can be null
      */
     private void addRecordsToTasks(final ConsumerRecords<byte[], byte[]> records) {
-        int numAddedRecords = 0;
 
         for (final TopicPartition partition : records.partitions()) {
             final StreamTask task = taskManager.activeTask(partition);
@@ -941,9 +919,8 @@ public class StreamThread extends Thread {
                 throw new TaskMigratedException(task);
             }
 
-            numAddedRecords += task.addRecords(partition, records.records(partition));
+            task.addRecords(partition, records.records(partition));
         }
-        streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs);
     }
 
     /**
@@ -1188,7 +1165,7 @@ public class StreamThread extends Thread {
         } catch (final Throwable e) {
             log.error("Failed to close restore consumer due to the following error:", e);
         }
-        streamsMetrics.removeAllSensors();
+        streamsMetrics.removeOwnedSensors();
 
         setState(State.DEAD);
         log.info("Shutdown complete");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsConventions.java
similarity index 52%
copy from streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
copy to streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsConventions.java
index d374136..cfe206c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsConventions.java
@@ -14,16 +14,26 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.processor.internals;
+package org.apache.kafka.streams.processor.internals.metrics;
 
-import org.apache.kafka.common.metrics.Metrics;
+import java.util.LinkedHashMap;
+import java.util.Map;
 
-import java.util.Collections;
+public final class StreamsMetricsConventions {
+    private StreamsMetricsConventions() {
+    }
 
-public class MockStreamsMetrics extends StreamsMetricsImpl {
+    public static String threadLevelSensorName(final String threadName, final String sensorName) {
+        return "thread." + threadName + "." + sensorName;
+    }
 
-    public MockStreamsMetrics(final Metrics metrics) {
-        super(metrics, "mock-stream-metrics",
-            Collections.<String, String>emptyMap());
+    static Map<String, String> threadLevelTags(final String threadName, final Map<String, String> tags) {
+        if (tags.containsKey("client-id")) {
+            return tags;
+        } else {
+            final LinkedHashMap<String, String> newTags = new LinkedHashMap<>(tags);
+            newTags.put("client-id", threadName);
+            return newTags;
+        }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
similarity index 64%
rename from streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
rename to streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 83b4f12..76a1d2b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -14,49 +14,62 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.processor.internals;
+package org.apache.kafka.streams.processor.internals.metrics;
 
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.metrics.MeasurableStat;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Count;
 import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Meter;
-import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.streams.StreamsMetrics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Deque;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
-public class StreamsMetricsImpl implements StreamsMetrics {
-    private static final Logger log = LoggerFactory.getLogger(StreamsMetricsImpl.class);
+import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsConventions.threadLevelSensorName;
 
-    final Metrics metrics;
-    final String groupName;
-    final Map<String, String> tags;
+public class StreamsMetricsImpl implements StreamsMetrics {
+    private final Metrics metrics;
+    private final Map<String, String> tags;
     private final Map<Sensor, Sensor> parentSensors;
+    private final Deque<String> ownedSensors = new LinkedList<>();
+    private final Sensor skippedRecordsSensor;
 
-    public StreamsMetricsImpl(final Metrics metrics, final String groupName, final Map<String, String> tags) {
+    public StreamsMetricsImpl(final Metrics metrics, final String threadName) {
         Objects.requireNonNull(metrics, "Metrics cannot be null");
 
         this.metrics = metrics;
-        this.groupName = groupName;
-        this.tags = tags;
+        this.tags = StreamsMetricsConventions.threadLevelTags(threadName, Collections.<String, String>emptyMap());
         this.parentSensors = new HashMap<>();
+
+        skippedRecordsSensor = metrics.sensor(threadLevelSensorName(threadName, "skipped-records"), Sensor.RecordingLevel.INFO);
+        skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", "stream-metrics", "The average per-second number of skipped records", tags), new Rate(TimeUnit.SECONDS, new Count()));
+        skippedRecordsSensor.add(metrics.metricName("skipped-records-total", "stream-metrics", "The total number of skipped records", tags), new Total());
+        ownedSensors.push(skippedRecordsSensor.name());
     }
 
-    public Metrics registry() {
+    public final Metrics registry() {
         return metrics;
     }
 
+    protected final Map<String, String> tags() {
+        return tags;
+    }
+
+    public final Sensor skippedRecordsSensor() {
+        return skippedRecordsSensor;
+    }
+
     @Override
     public Sensor addSensor(final String name, final Sensor.RecordingLevel recordingLevel) {
         return metrics.sensor(name, recordingLevel);
@@ -131,11 +144,13 @@ public class StreamsMetricsImpl implements StreamsMetrics {
 
         // first add the global operation metrics if not yet, with the global tags only
         final Sensor parent = metrics.sensor(sensorName(operationName, null), recordingLevel);
-        addLatencyAndThroughputMetrics(scopeName, parent, operationName, allTagMap);
+        addLatencyMetrics(scopeName, parent, operationName, allTagMap);
+        addThroughputMetrics(scopeName, parent, operationName, allTagMap);
 
         // add the operation metrics with additional tags
         final Sensor sensor = metrics.sensor(sensorName(operationName, entityName), recordingLevel, parent);
-        addLatencyAndThroughputMetrics(scopeName, sensor, operationName, tagMap);
+        addLatencyMetrics(scopeName, sensor, operationName, tagMap);
+        addThroughputMetrics(scopeName, sensor, operationName, tagMap);
 
         parentSensors.put(sensor, parent);
 
@@ -167,64 +182,44 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         return sensor;
     }
 
-    private void addLatencyAndThroughputMetrics(final String scopeName,
-                                                final Sensor sensor,
-                                                final String opName,
-                                                final Map<String, String> tags) {
-        maybeAddMetric(sensor, metrics.metricName(opName + "-latency-avg", groupNameFromScope(scopeName),
-            "The average latency of " + opName + " operation.", tags), new Avg());
-        maybeAddMetric(sensor, metrics.metricName(opName + "-latency-max", groupNameFromScope(scopeName),
-            "The max latency of " + opName + " operation.", tags), new Max());
-        addThroughputMetrics(scopeName, sensor, opName, tags);
-    }
-
-    private void addThroughputMetrics(final String scopeName,
-                                      final Sensor sensor,
-                                      final String opName,
-                                      final Map<String, String> tags) {
-        final MetricName rateMetricName = metrics.metricName(opName + "-rate", groupNameFromScope(scopeName),
-            "The average number of occurrence of " + opName + " operation per second.", tags);
-        final MetricName totalMetricName = metrics.metricName(opName + "-total", groupNameFromScope(scopeName),
-            "The total number of occurrence of " + opName + " operations.", tags);
-        if (!metrics.metrics().containsKey(rateMetricName) && !metrics.metrics().containsKey(totalMetricName)) {
-            sensor.add(new Meter(new Count(), rateMetricName, totalMetricName));
-        } else {
-            log.trace("Trying to add metric twice: {} {}", rateMetricName, totalMetricName);
-        }
-    }
-
-    /**
-     * Register a metric on the sensor if it isn't already there.
-     *
-     * @param sensor The sensor on which to register the metric
-     * @param name   The name of the metric
-     * @param stat   The metric to track
-     * @throws IllegalArgumentException if the same metric name is already in use elsewhere in the metrics
-     */
-    public void maybeAddMetric(final Sensor sensor, final MetricName name, final MeasurableStat stat) {
-        sensor.add(name, stat);
-    }
-
-    /**
-     * Helper function. Measure the latency of an action. This is equivalent to
-     * startTs = time.nanoseconds()
-     * action.run()
-     * endTs = time.nanoseconds()
-     * sensor.record(endTs - startTs)
-     *
-     * @param time   Time object.
-     * @param action Action to run.
-     * @param sensor Sensor to record value.
-     */
-    void measureLatencyNs(final Time time, final Runnable action, final Sensor sensor) {
-        long startNs = -1;
-        if (sensor.shouldRecord()) {
-            startNs = time.nanoseconds();
-        }
-        action.run();
-        if (startNs != -1) {
-            recordLatency(sensor, startNs, time.nanoseconds());
-        }
+    private void addLatencyMetrics(final String scopeName, final Sensor sensor, final String opName, final Map<String, String> tags) {
+        sensor.add(
+            metrics.metricName(
+                opName + "-latency-avg",
+                groupNameFromScope(scopeName),
+                "The average latency of " + opName + " operation.", tags),
+            new Avg()
+        );
+        sensor.add(
+            metrics.metricName(
+                opName + "-latency-max",
+                groupNameFromScope(scopeName),
+                "The max latency of " + opName + " operation.",
+                tags
+            ),
+            new Max()
+        );
+    }
+
+    private void addThroughputMetrics(final String scopeName, final Sensor sensor, final String opName, final Map<String, String> tags) {
+        sensor.add(
+            metrics.metricName(
+                opName + "-rate",
+                groupNameFromScope(scopeName),
+                "The average number of occurrence of " + opName + " operation per second.",
+                tags
+            ),
+            new Rate(TimeUnit.SECONDS, new Count())
+        );
+        sensor.add(
+            metrics.metricName(
+                opName + "-total",
+                groupNameFromScope(scopeName),
+                "The total number of occurrence of " + opName + " operations.",
+                tags
+            ),
+            new Count()
+        );
     }
 
     /**
@@ -239,6 +234,13 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         if (parent != null) {
             metrics.removeSensor(parent.name());
         }
+    }
 
+    public void removeOwnedSensors() {
+        synchronized (ownedSensors) {
+            while (!ownedSensors.isEmpty()) {
+                metrics.removeSensor(ownedSensors.pop());
+            }
+        }
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index 01ee6b1..de62a2d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.metrics.stats.Min;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -374,21 +374,21 @@ class NamedCache {
 
             // add parent
             final Sensor parent = this.metrics.registry().sensor(opName, Sensor.RecordingLevel.DEBUG);
-            ((StreamsMetricsImpl) metrics).maybeAddMetric(parent, this.metrics.registry().metricName(opName + "-avg", groupName,
-                "The average cache hit ratio.", allMetricTags), new Avg());
-            ((StreamsMetricsImpl) metrics).maybeAddMetric(parent, this.metrics.registry().metricName(opName + "-min", groupName,
-                "The minimum cache hit ratio.", allMetricTags), new Min());
-            ((StreamsMetricsImpl) metrics).maybeAddMetric(parent, this.metrics.registry().metricName(opName + "-max", groupName,
-                "The maximum cache hit ratio.", allMetricTags), new Max());
+            parent.add(this.metrics.registry().metricName(opName + "-avg", groupName,
+                    "The average cache hit ratio.", allMetricTags), new Avg());
+            parent.add(this.metrics.registry().metricName(opName + "-min", groupName,
+                    "The minimum cache hit ratio.", allMetricTags), new Min());
+            parent.add(this.metrics.registry().metricName(opName + "-max", groupName,
+                    "The maximum cache hit ratio.", allMetricTags), new Max());
 
             // add child
             hitRatioSensor = this.metrics.registry().sensor(opName, Sensor.RecordingLevel.DEBUG, parent);
-            ((StreamsMetricsImpl) metrics).maybeAddMetric(hitRatioSensor, this.metrics.registry().metricName(opName + "-avg", groupName,
-                "The average cache hit ratio.", metricTags), new Avg());
-            ((StreamsMetricsImpl) metrics).maybeAddMetric(hitRatioSensor, this.metrics.registry().metricName(opName + "-min", groupName,
-                "The minimum cache hit ratio.", metricTags), new Min());
-            ((StreamsMetricsImpl) metrics).maybeAddMetric(hitRatioSensor, this.metrics.registry().metricName(opName + "-max", groupName,
-                "The maximum cache hit ratio.", metricTags), new Max());
+            hitRatioSensor.add(this.metrics.registry().metricName(opName + "-avg", groupName,
+                    "The average cache hit ratio.", metricTags), new Avg());
+            hitRatioSensor.add(this.metrics.registry().metricName(opName + "-min", groupName,
+                    "The minimum cache hit ratio.", metricTags), new Min());
+            hitRatioSensor.add(this.metrics.registry().metricName(opName + "-max", groupName,
+                    "The maximum cache hit ratio.", metricTags), new Max());
 
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 2b87b30..c74d0dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
@@ -37,6 +39,7 @@ import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.WindowStore;
@@ -55,9 +58,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
 import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 
 public class KGroupedStreamImplTest {
@@ -558,6 +564,19 @@ public class KGroupedStreamImplTest {
         assertThat(count.get("3"), equalTo(2L));
     }
 
+    @Test
+    public void shouldLogAndMeasureSkipsInAggregate() {
+        groupedStream.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count").withKeySerde(Serdes.String()));
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+        processData();
+        LogCaptureAppender.unregister(appender);
+
+        final Map<MetricName, ? extends Metric> metrics = driver.context().metrics().metrics();
+        assertEquals(1.0, getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
+        assertNotEquals(0.0, getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
+        assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[-1] offset=[-1]"));
+    }
+
 
     @SuppressWarnings("unchecked")
     @Test
@@ -577,6 +596,26 @@ public class KGroupedStreamImplTest {
         assertThat(reduced.get("3"), equalTo("E+F"));
     }
 
+    @Test
+    public void shouldLogAndMeasureSkipsInReduce() {
+        groupedStream.reduce(
+            MockReducer.STRING_ADDER,
+            Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("reduce")
+                .withKeySerde(Serdes.String())
+                .withValueSerde(Serdes.String())
+        );
+
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+        processData();
+        LogCaptureAppender.unregister(appender);
+
+        final Map<MetricName, ? extends Metric> metrics = driver.context().metrics().metrics();
+        assertEquals(1.0, getMetricByName(metrics, "skipped-records-total", "stream-metrics").metricValue());
+        assertNotEquals(0.0, getMetricByName(metrics, "skipped-records-rate", "stream-metrics").metricValue());
+        assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[3] value=[null] topic=[topic] partition=[-1] offset=[-1]"));
+    }
+
+
     @SuppressWarnings("unchecked")
     @Test
     public void shouldAggregateAndMaterializeResults() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 83fee9b..4d2bce5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -24,7 +24,9 @@ import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
@@ -40,7 +42,10 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 
 public class KStreamKStreamJoinTest {
 
@@ -61,6 +66,35 @@ public class KStreamKStreamJoinTest {
     }
 
     @Test
+    public void shouldLogAndMeterOnSkippedRecordsWithNullValue() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, Integer> left = builder.stream("left", Consumed.with(stringSerde, intSerde));
+        final KStream<String, Integer> right = builder.stream("right", Consumed.with(stringSerde, intSerde));
+
+        left.join(
+            right,
+            new ValueJoiner<Integer, Integer, Integer>() {
+                @Override
+                public Integer apply(final Integer value1, final Integer value2) {
+                    return value1 + value2;
+                }
+            },
+            JoinWindows.of(100),
+            Joined.with(stringSerde, intSerde, intSerde)
+        );
+
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+        driver.setUp(builder, stateDir);
+        driver.process("left", "A", null);
+        LogCaptureAppender.unregister(appender);
+
+        assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[A] value=[null] topic=[left] partition=[-1] offset=[-1]"));
+
+        assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+    }
+
+    @Test
     public void testJoin() {
         final StreamsBuilder builder = new StreamsBuilder();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 851808c..1800385 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
@@ -37,6 +38,9 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
 public class KStreamKTableJoinTest {
@@ -181,4 +185,23 @@ public class KStreamKTableJoinTest {
         processor.checkAndClearProcessResult("2:XX2+Y2", "3:XX3+Y3");
     }
 
+    @Test
+    public void shouldLogAndMeterWhenSkippingNullLeftKey() {
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+        driver.process(streamTopic, null, "A");
+        LogCaptureAppender.unregister(appender);
+
+        assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+        assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[null] value=[A] topic=[streamTopic] partition=[-1] offset=[-1]"));
+    }
+
+    @Test
+    public void shouldLogAndMeterWhenSkippingNullLeftValue() {
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+        driver.process(streamTopic, 1, null);
+        LogCaptureAppender.unregister(appender);
+
+        assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+        assertThat(appender.getMessages(), hasItem("Skipping record due to null key or value. key=[1] value=[null] topic=[streamTopic] partition=[-1] offset=[-1]"));
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 2acf859..301d448 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -27,6 +27,8 @@ import org.apache.kafka.streams.kstream.SessionWindows;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier;
@@ -44,6 +46,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -300,4 +305,16 @@ public class KStreamSessionWindowAggregateProcessorTest {
 
     }
 
+    @Test
+    public void shouldLogAndMeterWhenSkippingNullKey() {
+        initStore(false);
+        processor.init(context);
+        context.setRecordContext(new ProcessorRecordContext(-1, -2, -3, "topic"));
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+        processor.process(null, "1");
+        LogCaptureAppender.unregister(appender);
+
+        assertEquals(1.0, getMetricByName(context.metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+        assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[1] topic=[topic] partition=[-3] offset=[-2]"));
+    }
 }
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 3d107c2..7082251 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -18,15 +18,20 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockAggregator;
@@ -39,6 +44,9 @@ import org.junit.Test;
 
 import java.io.File;
 
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
 public class KStreamWindowAggregateTest {
@@ -292,4 +300,29 @@ public class KStreamWindowAggregateTest {
         );
     }
 
+    @Test
+    public void shouldLogAndMeterWhenSkippingNullKey() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic = "topic";
+
+        final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(strSerde, strSerde));
+        stream1.groupByKey(Serialized.with(strSerde, strSerde))
+            .windowedBy(TimeWindows.of(10).advanceBy(5))
+            .aggregate(
+                MockInitializer.STRING_INIT,
+                MockAggregator.<String, String>toStringInstance("+"),
+                Materialized.<String, String, WindowStore<Bytes, byte[]>>as("topic1-Canonicalized").withValueSerde(strSerde)
+            );
+
+        driver.setUp(builder, stateDir);
+
+        setRecordContext(0, topic);
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+        driver.process(topic, null, "1");
+        driver.flushState();
+        LogCaptureAppender.unregister(appender);
+
+        assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+        assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[1] topic=[topic] partition=[-1] offset=[-1]"));
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
new file mode 100644
index 0000000..ee8ea14
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+
+public class KStreamWindowReduceTest {
+    @Test
+    public void shouldLogAndMeterOnNullKey() {
+        final KStreamTestDriver driver = new KStreamTestDriver();
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder
+            .stream("TOPIC", Consumed.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+            .windowedBy(TimeWindows.of(500L))
+            .reduce(new Reducer<String>() {
+                @Override
+                public String apply(final String value1, final String value2) {
+                    return value1 + "+" + value2;
+                }
+            });
+
+        driver.setUp(builder, TestUtils.tempDirectory(), 0);
+
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+        driver.process("TOPIC", null, "asdf");
+        driver.flushState();
+        LogCaptureAppender.unregister(appender);
+
+        assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+        assertThat(appender.getMessages(), hasItem("Skipping record due to null key. value=[asdf] topic=[TOPIC] partition=[-1] offset=[-1]"));
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index 98d3e52..9f5603b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -23,6 +23,9 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
@@ -37,6 +40,9 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -337,6 +343,27 @@ public class KTableKTableInnerJoinTest {
 
     }
 
+    @Test
+    public void shouldLogAndMeterSkippedRecordsDueToNullLeftKey() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final Processor<String, Change<String>> join = new KTableKTableInnerJoin<>(
+            (KTableImpl<String, String, String>) builder.table("left", Consumed.with(stringSerde, stringSerde)),
+            (KTableImpl<String, String, String>) builder.table("right", Consumed.with(stringSerde, stringSerde)),
+            null
+        ).get();
+
+        final MockProcessorContext context = new MockProcessorContext();
+        context.setRecordMetadata("left", -1, -2, -3);
+        join.init(context);
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+        join.process(null, new Change<>("new", "old"));
+        LogCaptureAppender.unregister(appender);
+
+        assertEquals(1.0, getMetricByName(context.metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+        assertThat(appender.getMessages(), hasItem("Skipping record due to null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]"));
+    }
+
     private KeyValue<Integer, String> kv(final Integer key, final String value) {
         return new KeyValue<>(key, value);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 7261ae0..6331b57 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -26,6 +26,9 @@ import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
@@ -43,6 +46,9 @@ import java.util.Locale;
 import java.util.Random;
 import java.util.Set;
 
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -398,6 +404,27 @@ public class KTableKTableLeftJoinTest {
         }
     }
 
+    @Test
+    public void shouldLogAndMeterSkippedRecordsDueToNullLeftKey() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final Processor<String, Change<String>> join = new KTableKTableLeftJoin<>(
+            (KTableImpl<String, String, String>) builder.table("left", Consumed.with(stringSerde, stringSerde)),
+            (KTableImpl<String, String, String>) builder.table("right", Consumed.with(stringSerde, stringSerde)),
+            null
+        ).get();
+
+        final MockProcessorContext context = new MockProcessorContext();
+        context.setRecordMetadata("left", -1, -2, -3);
+        join.init(context);
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+        join.process(null, new Change<>("new", "old"));
+        LogCaptureAppender.unregister(appender);
+
+        assertEquals(1.0, getMetricByName(context.metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+        assertThat(appender.getMessages(), hasItem("Skipping record due to null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]"));
+    }
+
     private KeyValue<Integer, String> kv(final Integer key, final String value) {
         return new KeyValue<>(key, value);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index e094591..16694d8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -23,6 +23,9 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
@@ -37,6 +40,9 @@ import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -342,6 +348,27 @@ public class KTableKTableOuterJoinTest {
         proc.checkAndClearProcessResult("1:(null<-XX1+null)", "2:(null+YY2<-XX2+YY2)");
     }
 
+    @Test
+    public void shouldLogAndMeterSkippedRecordsDueToNullLeftKey() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final Processor<String, Change<String>> join = new KTableKTableOuterJoin<>(
+            (KTableImpl<String, String, String>) builder.table("left", Consumed.with(stringSerde, stringSerde)),
+            (KTableImpl<String, String, String>) builder.table("right", Consumed.with(stringSerde, stringSerde)),
+            null
+        ).get();
+
+        final MockProcessorContext context = new MockProcessorContext();
+        context.setRecordMetadata("left", -1, -2, -3);
+        join.init(context);
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+        join.process(null, new Change<>("new", "old"));
+        LogCaptureAppender.unregister(appender);
+
+        assertEquals(1.0, getMetricByName(context.metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+        assertThat(appender.getMessages(), hasItem("Skipping record due to null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]"));
+    }
+
     private KeyValue<Integer, String> kv(final Integer key, final String value) {
         return new KeyValue<>(key, value);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
new file mode 100644
index 0000000..67adf2b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoinTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.junit.Test;
+
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+
+public class KTableKTableRightJoinTest {
+    @Test
+    public void shouldLogAndMeterSkippedRecordsDueToNullLeftKey() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final Processor<String, Change<String>> join = new KTableKTableRightJoin<>(
+            (KTableImpl<String, String, String>) builder.table("left", Consumed.with(Serdes.String(), Serdes.String())),
+            (KTableImpl<String, String, String>) builder.table("right", Consumed.with(Serdes.String(), Serdes.String())),
+            null
+        ).get();
+
+        final MockProcessorContext context = new MockProcessorContext();
+        context.setRecordMetadata("left", -1, -2, -3);
+        join.init(context);
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+        join.process(null, new Change<>("new", "old"));
+        LogCaptureAppender.unregister(appender);
+
+        assertEquals(1.0, getMetricByName(context.metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+        assertThat(appender.getMessages(), hasItem("Skipping record due to null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]"));
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 6ee1b1b..97c9c7f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
@@ -31,8 +32,11 @@ import org.junit.Test;
 
 import java.io.File;
 
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
+import static org.hamcrest.CoreMatchers.hasItem;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 public class KTableSourceTest {
@@ -74,6 +78,22 @@ public class KTableSourceTest {
     }
 
     @Test
+    public void kTableShouldLogAndMeterOnSkippedRecords() {
+        final StreamsBuilder streamsBuilder = new StreamsBuilder();
+        final String topic = "topic";
+        streamsBuilder.table(topic, Consumed.with(stringSerde, intSerde));
+
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+        driver.setUp(streamsBuilder, stateDir);
+        driver.process(topic, null, "value");
+        driver.flushState();
+        LogCaptureAppender.unregister(appender);
+
+        assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
+        assertThat(appender.getMessages(), hasItem("Skipping record due to null key. topic=[topic] partition=[-1] offset=[-1]"));
+    }
+
+    @Test
     public void testValueGetter() {
         final StreamsBuilder builder = new StreamsBuilder();
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
index 924bc52..5637dab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
@@ -178,7 +178,8 @@ public class GlobalStateTaskTest {
             context,
             stateMgr,
             new LogAndContinueExceptionHandler(),
-            logContext);
+            logContext
+        );
         final byte[] key = new LongSerializer().serialize(topic2, 1L);
         final byte[] recordValue = new IntegerSerializer().serialize(topic2, 10);
 
@@ -192,7 +193,8 @@ public class GlobalStateTaskTest {
             context,
             stateMgr,
             new LogAndContinueExceptionHandler(),
-            logContext);
+            logContext
+        );
         final byte[] key = new IntegerSerializer().serialize(topic2, 1);
         final byte[] recordValue = new LongSerializer().serialize(topic2, 10L);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
index d374136..bd35530 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/MockStreamsMetrics.java
@@ -17,13 +17,11 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.metrics.Metrics;
-
-import java.util.Collections;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 
 public class MockStreamsMetrics extends StreamsMetricsImpl {
 
     public MockStreamsMetrics(final Metrics metrics) {
-        super(metrics, "mock-stream-metrics",
-            Collections.<String, String>emptyMap());
+        super(metrics, "test");
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 5a182de..1d2f613 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.MockSourceNode;
 import org.apache.kafka.test.MockTimestampExtractor;
 import org.junit.Test;
@@ -49,15 +50,17 @@ public class PartitionGroupTest {
         new MockSourceNode<>(topics, intDeserializer, intDeserializer),
         timestampExtractor,
         new LogAndContinueExceptionHandler(),
-        null,
-        logContext);
+        new InternalMockProcessorContext(),
+        logContext
+    );
     private final RecordQueue queue2 = new RecordQueue(
         partition2,
         new MockSourceNode<>(topics, intDeserializer, intDeserializer),
         timestampExtractor,
         new LogAndContinueExceptionHandler(),
-        null,
-        logContext);
+        new InternalMockProcessorContext(),
+        logContext
+    );
 
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 146f3ff..1409d68 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -93,19 +94,6 @@ public class ProcessorNodeTest {
         }
     }
 
-    private void testSpecificMetrics(final Metrics metrics,
-                                     @SuppressWarnings("SameParameterValue") final String groupName,
-                                     final String opName,
-                                     final Map<String, String> metricTags) {
-        assertNotNull(metrics.metrics().get(metrics.metricName(opName + "-latency-avg", groupName,
-            "The average latency of " + opName + " operation.", metricTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName(opName + "-latency-max", groupName,
-            "The max latency of " + opName + " operation.", metricTags)));
-        assertNotNull(metrics.metrics().get(metrics.metricName(opName + "-rate", groupName,
-            "The average number of occurrence of " + opName + " operation per second.", metricTags)));
-
-    }
-
     @Test
     public void testMetrics() {
         final StateSerdes anyStateSerde = StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class);
@@ -113,7 +101,13 @@ public class ProcessorNodeTest {
         final Metrics metrics = new Metrics();
         final InternalMockProcessorContext context = new InternalMockProcessorContext(
             anyStateSerde,
-            new RecordCollectorImpl(null, null, new LogContext("processnode-test "), new DefaultProductionExceptionHandler()),
+            new RecordCollectorImpl(
+                null,
+                null,
+                new LogContext("processnode-test "),
+                new DefaultProductionExceptionHandler(),
+                metrics.sensor("skipped-records")
+            ),
             metrics
         );
         final ProcessorNode<Object, Object> node = new ProcessorNode<>("name", new NoOpProcessor(), Collections.<String>emptySet());
@@ -125,6 +119,7 @@ public class ProcessorNodeTest {
         final Map<String, String> metricTags = new LinkedHashMap<>();
         metricTags.put("processor-node-id", node.name());
         metricTags.put("task-id", context.taskId().toString());
+        metricTags.put("client-id", "mock");
 
 
         for (final String operation : latencyOperations) {
@@ -133,7 +128,10 @@ public class ProcessorNodeTest {
         assertNotNull(metrics.getSensor(throughputOperation));
 
         for (final String opName : latencyOperations) {
-            testSpecificMetrics(metrics, groupName, opName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-avg", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-max", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-rate", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-total", groupName, metricTags);
         }
         assertNotNull(metrics.metrics().get(metrics.metricName(throughputOperation + "-rate", groupName,
             "The average number of occurrence of " + throughputOperation + " operation per second.", metricTags)));
@@ -141,7 +139,10 @@ public class ProcessorNodeTest {
         // test "all"
         metricTags.put("processor-node-id", "all");
         for (final String opName : latencyOperations) {
-            testSpecificMetrics(metrics, groupName, opName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-avg", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-latency-max", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-rate", groupName, metricTags);
+            StreamsTestUtils.getMetricByNameFilterByTags(metrics.metrics(), opName + "-total", groupName, metricTags);
         }
         assertNotNull(metrics.metrics().get(metrics.metricName(throughputOperation + "-rate", groupName,
             "The average number of occurrence of " + throughputOperation + " operation per second.", metricTags)));
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index d877450..8a2f171 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -23,9 +23,13 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Sum;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.LogContext;
@@ -33,6 +37,7 @@ import org.apache.kafka.streams.errors.AlwaysContinueProductionExceptionHandler;
 import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -42,6 +47,7 @@ import java.util.Map;
 import java.util.concurrent.Future;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class RecordCollectorTest {
@@ -75,7 +81,9 @@ public class RecordCollectorTest {
             new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
             "RecordCollectorTest-TestSpecificPartition",
             new LogContext("RecordCollectorTest-TestSpecificPartition "),
-            new DefaultProductionExceptionHandler());
+            new DefaultProductionExceptionHandler(),
+            new Metrics().sensor("skipped-records")
+        );
 
         collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
         collector.send("topic1", "999", "0", 0, null, stringSerializer, stringSerializer);
@@ -109,7 +117,9 @@ public class RecordCollectorTest {
             new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer),
             "RecordCollectorTest-TestStreamPartitioner",
             new LogContext("RecordCollectorTest-TestStreamPartitioner "),
-            new DefaultProductionExceptionHandler());
+            new DefaultProductionExceptionHandler(),
+            new Metrics().sensor("skipped-records")
+        );
 
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
         collector.send("topic1", "9", "0", null, stringSerializer, stringSerializer, streamPartitioner);
@@ -142,7 +152,8 @@ public class RecordCollectorTest {
             },
             "test",
             logContext,
-            new DefaultProductionExceptionHandler());
+            new DefaultProductionExceptionHandler(),
+            new Metrics().sensor("skipped-records"));
 
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
     }
@@ -160,7 +171,9 @@ public class RecordCollectorTest {
             },
             "test",
             logContext,
-            new DefaultProductionExceptionHandler());
+            new DefaultProductionExceptionHandler(),
+            new Metrics().sensor("skipped-records"));
+
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
 
         try {
@@ -182,7 +195,9 @@ public class RecordCollectorTest {
             },
             "test",
             logContext,
-            new AlwaysContinueProductionExceptionHandler());
+            new AlwaysContinueProductionExceptionHandler(),
+            new Metrics().sensor("skipped-records"));
+
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
 
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
@@ -190,6 +205,32 @@ public class RecordCollectorTest {
 
     @SuppressWarnings("unchecked")
     @Test
+    public void shouldRecordSkippedMetricAndLogWarningIfSendFailsWithContinueExceptionHandler() {
+        final Metrics metrics = new Metrics();
+        final Sensor sensor = metrics.sensor("skipped-records");
+        final LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister();
+        final MetricName metricName = new MetricName("name", "group", "description", Collections.EMPTY_MAP);
+        sensor.add(metricName, new Sum());
+        final RecordCollector collector = new RecordCollectorImpl(
+            new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+                @Override
+                public synchronized Future<RecordMetadata> send(final ProducerRecord record, final Callback callback) {
+                    callback.onCompletion(null, new Exception());
+                    return null;
+                }
+            },
+            "test",
+            logContext,
+            new AlwaysContinueProductionExceptionHandler(),
+            sensor);
+        collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
+        assertEquals(1.0, metrics.metrics().get(metricName).metricValue());
+        assertTrue(logCaptureAppender.getMessages().contains("test Error sending records (key=[3] value=[0] timestamp=[null]) to topic=[topic1] and partition=[0]; The exception handler chose to CONTINUE processing in spite of this error."));
+        LogCaptureAppender.unregister(logCaptureAppender);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
     public void shouldThrowStreamsExceptionOnFlushIfASendFailedWithDefaultExceptionHandler() {
         final RecordCollector collector = new RecordCollectorImpl(
             new MockProducer(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@@ -201,7 +242,9 @@ public class RecordCollectorTest {
             },
             "test",
             logContext,
-            new DefaultProductionExceptionHandler());
+            new DefaultProductionExceptionHandler(),
+            new Metrics().sensor("skipped-records"));
+
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
 
         try {
@@ -223,7 +266,9 @@ public class RecordCollectorTest {
             },
             "test",
             logContext,
-            new AlwaysContinueProductionExceptionHandler());
+            new AlwaysContinueProductionExceptionHandler(),
+            new Metrics().sensor("skipped-records"));
+
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
 
         collector.flush();
@@ -242,7 +287,9 @@ public class RecordCollectorTest {
             },
             "test",
             logContext,
-            new DefaultProductionExceptionHandler());
+            new DefaultProductionExceptionHandler(),
+            new Metrics().sensor("skipped-records"));
+
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
 
         try {
@@ -264,7 +311,9 @@ public class RecordCollectorTest {
             },
             "test",
             logContext,
-            new AlwaysContinueProductionExceptionHandler());
+            new AlwaysContinueProductionExceptionHandler(),
+            new Metrics().sensor("skipped-records"));
+
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
 
         collector.close();
@@ -283,7 +332,8 @@ public class RecordCollectorTest {
             },
             "test",
             logContext,
-            new DefaultProductionExceptionHandler());
+            new DefaultProductionExceptionHandler(),
+            new Metrics().sensor("skipped-records"));
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
     }
 
@@ -300,7 +350,8 @@ public class RecordCollectorTest {
             },
             "test",
             logContext,
-            new AlwaysContinueProductionExceptionHandler());
+            new AlwaysContinueProductionExceptionHandler(),
+            new Metrics().sensor("skipped-records"));
         collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner);
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
index 7e90466..de8e17b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordDeserializerTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.LogContext;
 import org.junit.Test;
@@ -44,7 +45,15 @@ public class RecordDeserializerTest {
     @Test
     public void shouldReturnNewConsumerRecordWithDeserializedValueWhenNoExceptions() {
         final RecordDeserializer recordDeserializer = new RecordDeserializer(
-            new TheSourceNode(false, false, "key", "value"), null, new LogContext());
+            new TheSourceNode(
+                false,
+                false,
+                "key", "value"
+            ),
+            null,
+            new LogContext(),
+            new Metrics().sensor("skipped-records")
+        );
         final ConsumerRecord<Object, Object> record = recordDeserializer.deserialize(null, rawRecord);
         assertEquals(rawRecord.topic(), record.topic());
         assertEquals(rawRecord.partition(), record.partition());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
index 48be292..3ed9e3b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java
@@ -18,6 +18,8 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -54,8 +56,18 @@ public class RecordQueueTest {
     private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
     private final String[] topics = {"topic"};
 
-    final InternalMockProcessorContext context = new InternalMockProcessorContext(StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
-        new RecordCollectorImpl(null, null, new LogContext("record-queue-test "), new DefaultProductionExceptionHandler()));
+    private final Sensor skippedRecordsSensor = new Metrics().sensor("skipped-records");
+
+    final InternalMockProcessorContext context = new InternalMockProcessorContext(
+        StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class),
+        new RecordCollectorImpl(
+            null,
+            null,
+            new LogContext("record-queue-test "),
+            new DefaultProductionExceptionHandler(),
+            skippedRecordsSensor
+        )
+    );
     private final MockSourceNode mockSourceNodeWithMetrics = new MockSourceNode<>(topics, intDeserializer, intDeserializer);
     private final RecordQueue queue = new RecordQueue(
         new TopicPartition(topics[0], 1),
@@ -230,8 +242,9 @@ public class RecordQueueTest {
             new MockSourceNode<>(topics, intDeserializer, intDeserializer),
             new FailOnInvalidTimestamp(),
             new LogAndContinueExceptionHandler(),
-            null,
+            new InternalMockProcessorContext(),
             new LogContext());
+
         queue.addRawRecords(records);
     }
 
@@ -245,7 +258,7 @@ public class RecordQueueTest {
             new MockSourceNode<>(topics, intDeserializer, intDeserializer),
             new LogAndSkipOnInvalidTimestamp(),
             new LogAndContinueExceptionHandler(),
-            null,
+            new InternalMockProcessorContext(),
             new LogContext());
         queue.addRawRecords(records);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index d005bbd..0013167 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Bytes;
@@ -43,7 +44,8 @@ public class SinkNodeTest {
             new MockProducer<>(true, anySerializer, anySerializer),
             null,
             new LogContext("sinknode-test "),
-            new DefaultProductionExceptionHandler()
+            new DefaultProductionExceptionHandler(),
+            new Metrics().sensor("skipped-records")
         )
     );
     private final SinkNode sink = new SinkNode<>("anyNodeName", "any-output-topic", anySerializer, anySerializer, null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 8d81b1f..28e0b46 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
@@ -34,14 +35,13 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.errors.ProductionExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockProcessorNode;
 import org.apache.kafka.test.MockSourceNode;
@@ -125,7 +125,8 @@ public class StreamTaskTest {
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
     private final Metrics metrics = new Metrics();
-    private final StreamsMetrics streamsMetrics = new MockStreamsMetrics(metrics);
+    private final Sensor skippedRecordsSensor = metrics.sensor("skipped-records");
+    private final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(metrics);
     private final TaskId taskId00 = new TaskId(0, 0);
     private final MockTime time = new MockTime();
     private final File baseDir = TestUtils.tempDirectory();
@@ -246,7 +247,7 @@ public class StreamTaskTest {
             String.format(nameFormat, "commit"),
             "stream-task-metrics",
             String.format(descriptionFormat, "commit"),
-            mkMap(mkEntry("task-id", taskId))
+            mkMap(mkEntry("task-id", taskId), mkEntry("client-id", "test"))
         ));
     }
 
@@ -647,21 +648,26 @@ public class StreamTaskTest {
     @Test
     public void shouldFlushRecordCollectorOnFlushState() {
         final AtomicBoolean flushed = new AtomicBoolean(false);
-        final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
-        final StreamTask streamTask = new StreamTask(taskId00, partitions, topology, consumer,
-            changelogReader, createConfig(false), streamsMetrics, stateDirectory, null, time, producer) {
-
-            @Override
-            RecordCollector createRecordCollector(final LogContext logContext,
-                                                  final ProductionExceptionHandler exHandler) {
-                return new NoOpRecordCollector() {
-                    @Override
-                    public void flush() {
-                        flushed.set(true);
-                    }
-                };
+        final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(new Metrics());
+        final StreamTask streamTask = new StreamTask(
+            taskId00,
+            partitions,
+            topology,
+            consumer,
+            changelogReader,
+            createConfig(false),
+            streamsMetrics,
+            stateDirectory,
+            null,
+            time,
+            producer,
+            new NoOpRecordCollector() {
+                @Override
+                public void flush() {
+                    flushed.set(true);
+                }
             }
-        };
+        );
         streamTask.flushState();
         assertTrue(flushed.get());
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index b95507d..36a1bce 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -31,11 +31,13 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
@@ -51,6 +53,7 @@ import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockStateRestoreListener;
@@ -79,6 +82,7 @@ import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
@@ -287,7 +291,7 @@ public class StreamThreadTest {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 1);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "", "", Collections.<String, String>emptyMap());
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -319,7 +323,7 @@ public class StreamThreadTest {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "", "", Collections.<String, String>emptyMap());
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -351,7 +355,7 @@ public class StreamThreadTest {
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
         final TaskManager taskManager = mockTaskManagerCommit(consumer, 2, 1);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "", "", Collections.<String, String>emptyMap());
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -497,12 +501,7 @@ public class StreamThreadTest {
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager, consumer);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(
-            metrics,
-            "",
-            "",
-            Collections.<String, String>emptyMap()
-        );
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -537,12 +536,7 @@ public class StreamThreadTest {
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager, consumer);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(
-            metrics,
-            "",
-            "",
-            Collections.<String, String>emptyMap()
-        );
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -568,11 +562,7 @@ public class StreamThreadTest {
         EasyMock.expectLastCall();
         EasyMock.replay(taskManager, consumer);
 
-        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(
-            metrics,
-            "",
-            "",
-            Collections.<String, String>emptyMap());
+        final StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "");
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -783,7 +773,8 @@ public class StreamThreadTest {
         final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
         restoreConsumer.updatePartitions(
             "stream-thread-test-count-one-changelog",
-            singletonList(new PartitionInfo("stream-thread-test-count-one-changelog",
+            singletonList(
+                new PartitionInfo("stream-thread-test-count-one-changelog",
                 0,
                 null,
                 new Node[0],
@@ -1127,7 +1118,54 @@ public class StreamThreadTest {
     }
 
     @Test
+    public void shouldRecordSkippedMetricForDeserializationException() {
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+
+        internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
+
+        final Properties config = configProps(false);
+        config.setProperty(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName());
+        config.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
+        final StreamThread thread = createStreamThread(clientId, new StreamsConfig(config), false);
+
+        thread.setState(StreamThread.State.RUNNING);
+        thread.setState(StreamThread.State.PARTITIONS_REVOKED);
+
+        final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1);
+        thread.taskManager().setAssignmentMetadata(
+            Collections.singletonMap(
+                new TaskId(0, t1p1.partition()),
+                assignedPartitions),
+            Collections.<TaskId, Set<TopicPartition>>emptyMap());
+
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(Collections.singleton(t1p1));
+        mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
+        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        thread.runOnce(-1);
+
+        final MetricName skippedTotalMetric = metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", thread.getName()));
+        final MetricName skippedRateMetric = metrics.metricName("skipped-records-rate", "stream-metrics", Collections.singletonMap("client-id", thread.getName()));
+        assertEquals(0.0, metrics.metric(skippedTotalMetric).metricValue());
+        assertEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
+
+        long offset = -1;
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], "I am not an integer.".getBytes()));
+        mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], "I am not an integer.".getBytes()));
+        thread.runOnce(-1);
+        assertEquals(2.0, metrics.metric(skippedTotalMetric).metricValue());
+        assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
+
+        LogCaptureAppender.unregister(appender);
+        final List<String> strings = appender.getMessages();
+        assertTrue(strings.contains("task [0_1] Skipping record due to deserialization error. topic=[topic1] partition=[1] offset=[0]"));
+        assertTrue(strings.contains("task [0_1] Skipping record due to deserialization error. topic=[topic1] partition=[1] offset=[1]"));
+    }
+
+    @Test
     public void shouldReportSkippedRecordsForInvalidTimestamps() {
+        final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
+
         internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
 
         final Properties config = configProps(false);
@@ -1151,13 +1189,16 @@ public class StreamThreadTest {
         thread.runOnce(-1);
 
         final MetricName skippedTotalMetric = metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", thread.getName()));
+        final MetricName skippedRateMetric = metrics.metricName("skipped-records-rate", "stream-metrics", Collections.singletonMap("client-id", thread.getName()));
         assertEquals(0.0, metrics.metric(skippedTotalMetric).metricValue());
+        assertEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
 
         long offset = -1;
         mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
         mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
         thread.runOnce(-1);
         assertEquals(2.0, metrics.metric(skippedTotalMetric).metricValue());
+        assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
 
         mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
         mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
@@ -1165,11 +1206,46 @@ public class StreamThreadTest {
         mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, -1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
         thread.runOnce(-1);
         assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue());
+        assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
 
         mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
         mockConsumer.addRecord(new ConsumerRecord<>(t1p1.topic(), t1p1.partition(), ++offset, 1, TimestampType.CREATE_TIME, -1, -1, -1, new byte[0], new byte[0]));
         thread.runOnce(-1);
         assertEquals(6.0, metrics.metric(skippedTotalMetric).metricValue());
+        assertNotEquals(0.0, metrics.metric(skippedRateMetric).metricValue());
+
+        LogCaptureAppender.unregister(appender);
+        final List<String> strings = appender.getMessages();
+        assertTrue(strings.contains(
+            "task [0_1] Skipping record due to negative extracted timestamp. " +
+                "topic=[topic1] partition=[1] offset=[0] extractedTimestamp=[-1] " +
+                "extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
+        ));
+        assertTrue(strings.contains(
+            "task [0_1] Skipping record due to negative extracted timestamp. " +
+                "topic=[topic1] partition=[1] offset=[1] extractedTimestamp=[-1] " +
+                "extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
+        ));
+        assertTrue(strings.contains(
+            "task [0_1] Skipping record due to negative extracted timestamp. " +
+                "topic=[topic1] partition=[1] offset=[2] extractedTimestamp=[-1] " +
+                "extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
+        ));
+        assertTrue(strings.contains(
+            "task [0_1] Skipping record due to negative extracted timestamp. " +
+                "topic=[topic1] partition=[1] offset=[3] extractedTimestamp=[-1] " +
+                "extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
+        ));
+        assertTrue(strings.contains(
+            "task [0_1] Skipping record due to negative extracted timestamp. " +
+                "topic=[topic1] partition=[1] offset=[4] extractedTimestamp=[-1] " +
+                "extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
+        ));
+        assertTrue(strings.contains(
+            "task [0_1] Skipping record due to negative extracted timestamp. " +
+                "topic=[topic1] partition=[1] offset=[5] extractedTimestamp=[-1] " +
+                "extractor=[org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp]"
+        ));
     }
 
     private void assertThreadMetadataHasEmptyTasksWithState(final ThreadMetadata metadata,
@@ -1178,4 +1254,7 @@ public class StreamThreadTest {
         assertTrue(metadata.activeTasks().isEmpty());
         assertTrue(metadata.standbyTasks().isEmpty());
     }
+
+
+
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
index ee8abd0..b065e2c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
@@ -17,43 +17,33 @@
 package org.apache.kafka.streams.processor.internals;
 
 
-import org.apache.kafka.common.Metric;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.junit.Test;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import static org.junit.Assert.assertEquals;
 
 public class StreamsMetricsImplTest {
 
     @Test(expected = NullPointerException.class)
     public void testNullMetrics() {
-        final String groupName = "doesNotMatter";
-        final Map<String, String> tags = new HashMap<>();
-        new StreamsMetricsImpl(null, groupName, tags);
+        new StreamsMetricsImpl(null, "");
     }
 
     @Test(expected = NullPointerException.class)
     public void testRemoveNullSensor() {
-        final String groupName = "doesNotMatter";
-        final Map<String, String> tags = new HashMap<>();
-        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), groupName, tags);
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "");
         streamsMetrics.removeSensor(null);
     }
 
     @Test
     public void testRemoveSensor() {
-        final String groupName = "doesNotMatter";
         final String sensorName = "sensor1";
         final String scope = "scope";
         final String entity = "entity";
         final String operation = "put";
-        final Map<String, String> tags = new HashMap<>();
-        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), groupName, tags);
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "");
 
         final Sensor sensor1 = streamsMetrics.addSensor(sensorName, Sensor.RecordingLevel.DEBUG);
         streamsMetrics.removeSensor(sensor1);
@@ -70,44 +60,40 @@ public class StreamsMetricsImplTest {
 
     @Test
     public void testLatencyMetrics() {
-        final String groupName = "doesNotMatter";
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "");
+        final int defaultMetrics = streamsMetrics.metrics().size();
+
         final String scope = "scope";
         final String entity = "entity";
         final String operation = "put";
-        final Map<String, String> tags = new HashMap<>();
-        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), groupName, tags);
 
         final Sensor sensor1 = streamsMetrics.addLatencyAndThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
 
-        Map<MetricName, ? extends Metric> metrics = streamsMetrics.metrics();
         // 2 meters and 4 non-meter metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor
         final int meterMetricsCount = 2; // Each Meter is a combination of a Rate and a Total
         final int otherMetricsCount = 4;
-        assertEquals(meterMetricsCount * 2 + otherMetricsCount + 1, metrics.size());
+        assertEquals(defaultMetrics + meterMetricsCount * 2 + otherMetricsCount, streamsMetrics.metrics().size());
 
         streamsMetrics.removeSensor(sensor1);
-        metrics = streamsMetrics.metrics();
-        assertEquals(metrics.size(), 1);
+        assertEquals(defaultMetrics, streamsMetrics.metrics().size());
     }
 
     @Test
     public void testThroughputMetrics() {
-        final String groupName = "doesNotMatter";
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "");
+        final int defaultMetrics = streamsMetrics.metrics().size();
+
         final String scope = "scope";
         final String entity = "entity";
         final String operation = "put";
-        final Map<String, String> tags = new HashMap<>();
-        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), groupName, tags);
 
         final Sensor sensor1 = streamsMetrics.addThroughputSensor(scope, entity, operation, Sensor.RecordingLevel.DEBUG);
 
-        Map<MetricName, ? extends Metric> metrics = streamsMetrics.metrics();
         final int meterMetricsCount = 2; // Each Meter is a combination of a Rate and a Total
         // 2 meter metrics plus a common metric that keeps track of total registered metrics in Metrics() constructor
-        assertEquals(meterMetricsCount * 2 + 1, metrics.size());
+        assertEquals(defaultMetrics + meterMetricsCount * 2, streamsMetrics.metrics().size());
 
         streamsMetrics.removeSensor(sensor1);
-        metrics = streamsMetrics.metrics();
-        assertEquals(metrics.size(), 1);
+        assertEquals(defaultMetrics, streamsMetrics.metrics().size());
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
new file mode 100644
index 0000000..b6f5769
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/LogCaptureAppender.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.testutil;
+
+
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class LogCaptureAppender extends AppenderSkeleton {
+    private final LinkedList<LoggingEvent> events = new LinkedList<>();
+
+    public static LogCaptureAppender createAndRegister() {
+        final LogCaptureAppender logCaptureAppender = new LogCaptureAppender();
+        Logger.getRootLogger().addAppender(logCaptureAppender);
+        return logCaptureAppender;
+    }
+
+    public static void unregister(final LogCaptureAppender logCaptureAppender) {
+        Logger.getRootLogger().removeAppender(logCaptureAppender);
+    }
+
+    @Override
+    protected void append(final LoggingEvent event) {
+        synchronized (events) {
+            events.add(event);
+        }
+    }
+
+    public List<String> getMessages() {
+        final LinkedList<String> result = new LinkedList<>();
+        synchronized (events) {
+            for (final LoggingEvent event : events) {
+                result.add(event.getRenderedMessage());
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+    @Override
+    public boolean requiresLayout() {
+        return false;
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index ff1efc9..33591c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.state;
 
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
@@ -186,7 +187,13 @@ public class KeyValueStoreTestDriver<K, V> {
         final ByteArraySerializer rawSerializer = new ByteArraySerializer();
         final Producer<byte[], byte[]> producer = new MockProducer<>(true, rawSerializer, rawSerializer);
 
-        final RecordCollector recordCollector = new RecordCollectorImpl(producer, "KeyValueStoreTestDriver", new LogContext("KeyValueStoreTestDriver "), new DefaultProductionExceptionHandler()) {
+        final RecordCollector recordCollector = new RecordCollectorImpl(
+            producer,
+            "KeyValueStoreTestDriver",
+            new LogContext("KeyValueStoreTestDriver "),
+            new DefaultProductionExceptionHandler(),
+            new Metrics().sensor("skipped-records")
+        ) {
             @Override
             public <K1, V1> void send(final String topic,
                                       final K1 key,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
index 8880007..a5e0d79 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueBytesStoreTest.java
@@ -40,10 +40,11 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -53,7 +54,11 @@ import static org.junit.Assert.assertTrue;
 public class MeteredKeyValueBytesStoreTest {
 
     private final TaskId taskId = new TaskId(0, 0);
-    private final Map<String, String> tags = new HashMap<>();
+    private final Map<String, String> tags = mkMap(
+        mkEntry("client-id", "test"),
+        mkEntry("task-id", taskId.toString()),
+        mkEntry("scope-id", "metered")
+    );
     @Mock(type = MockType.NICE)
     private KeyValueStore<Bytes, byte[]> inner;
     @Mock(type = MockType.NICE)
@@ -68,13 +73,13 @@ public class MeteredKeyValueBytesStoreTest {
 
     @Before
     public void before() {
-        metered = new MeteredKeyValueBytesStore<>(inner,
-                                                  "scope",
-                                                  new MockTime(),
-                                                  Serdes.String(),
-                                                  Serdes.String());
-        tags.put("task-id", taskId.toString());
-        tags.put("scope-id", "metered");
+        metered = new MeteredKeyValueBytesStore<>(
+            inner,
+            "scope",
+            new MockTime(),
+            Serdes.String(),
+            Serdes.String()
+        );
         metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
         EasyMock.expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
         EasyMock.expect(context.taskId()).andReturn(taskId);
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 256e5ff..3bd190a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -42,9 +42,10 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -54,7 +55,11 @@ import static org.junit.Assert.assertTrue;
 public class MeteredSessionStoreTest {
 
     private final TaskId taskId = new TaskId(0, 0);
-    private final Map<String, String> tags = new HashMap<>();
+    private final Map<String, String> tags = mkMap(
+        mkEntry("client-id", "test"),
+        mkEntry("task-id", taskId.toString()),
+        mkEntry("scope-id", "metered")
+    );
     private final Metrics metrics = new Metrics();
     private MeteredSessionStore<String, String> metered;
     @Mock(type = MockType.NICE)
@@ -74,8 +79,6 @@ public class MeteredSessionStoreTest {
                                             Serdes.String(),
                                             Serdes.String(),
                                             new MockTime());
-        tags.put("task-id", taskId.toString());
-        tags.put("scope-id", "metered");
         metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
         EasyMock.expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
         EasyMock.expect(context.taskId()).andReturn(taskId);
@@ -96,7 +99,7 @@ public class MeteredSessionStoreTest {
         metered.put(new Windowed<>(key, new SessionWindow(0, 0)), key);
 
         final KafkaMetric metric = metric("put-rate");
-        assertTrue(metric.value() > 0);
+        assertTrue(((Double) metric.metricValue()) > 0);
         EasyMock.verify(inner);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 4fd7f30..eab523e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -18,99 +18,69 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.internals.RecordCollector;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByNameFilterByTags;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
 
 public class MeteredWindowStoreTest {
     private InternalMockProcessorContext context;
     @SuppressWarnings("unchecked")
     private final WindowStore<Bytes, byte[]> innerStoreMock = EasyMock.createNiceMock(WindowStore.class);
-    private final MeteredWindowStore<String, String> store = new MeteredWindowStore<>(innerStoreMock, "scope", new MockTime(), Serdes.String(), new SerdeThatDoesntHandleNull());
-    private final Set<String> latencyRecorded = new HashSet<>();
-    private final Set<String> throughputRecorded = new HashSet<>();
+    private final MeteredWindowStore<String, String> store = new MeteredWindowStore<>(
+        innerStoreMock,
+        "scope",
+        new MockTime(),
+        Serdes.String(),
+        new SerdeThatDoesntHandleNull()
+    );
+
+    {
+        EasyMock.expect(innerStoreMock.name()).andReturn("mocked-store").anyTimes();
+    }
 
     @Before
-    public void setUp() throws Exception {
-        final Metrics metrics = new Metrics();
-        final StreamsMetrics streamsMetrics = new StreamsMetrics() {
-
-            @Override
-            public Map<MetricName, ? extends Metric> metrics() {
-                return Collections.unmodifiableMap(metrics.metrics());
-            }
-
-            @Override
-            public Sensor addLatencyAndThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordLevel, String... tags) {
-                return metrics.sensor(operationName);
-            }
-
-            @Override
-            public void recordLatency(final Sensor sensor, final long startNs, final long endNs) {
-                latencyRecorded.add(sensor.name());
-            }
-
-            @Override
-            public Sensor addThroughputSensor(String scopeName, String entityName, String operationName, Sensor.RecordingLevel recordLevel, String... tags) {
-                return metrics.sensor(operationName);
-            }
-
-            @Override
-            public void recordThroughput(Sensor sensor, long value) {
-                throughputRecorded.add(sensor.name());
-            }
-
-            @Override
-            public void removeSensor(Sensor sensor) {
-                metrics.removeSensor(sensor.name());
-            }
-
-            @Override
-            public Sensor addSensor(String name, Sensor.RecordingLevel recordLevel) {
-                return metrics.sensor(name);
-            }
-
-            @Override
-            public Sensor addSensor(String name, Sensor.RecordingLevel recordLevel, Sensor... parents) {
-                return metrics.sensor(name);
-            }
-
-        };
+    public void setUp() {
+        final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
 
         context = new InternalMockProcessorContext(
             TestUtils.tempDirectory(),
             Serdes.String(),
             Serdes.Long(),
-            new NoOpRecordCollector(),
-            new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)) {
-
-            @Override
-            public StreamsMetrics metrics() {
-                return streamsMetrics;
-            }
-        };
-        EasyMock.expect(innerStoreMock.name()).andReturn("store").anyTimes();
+            streamsMetrics,
+            new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()),
+            new RecordCollector.Supplier() {
+                @Override
+                public RecordCollector recordCollector() {
+                    return new NoOpRecordCollector();
+                }
+            },
+            new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
+        );
     }
 
     @After
@@ -124,7 +94,9 @@ public class MeteredWindowStoreTest {
         EasyMock.expectLastCall();
         EasyMock.replay(innerStoreMock);
         store.init(context, store);
-        assertTrue(latencyRecorded.contains("restore"));
+        final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "restore-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "restore-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
     }
 
     @Test
@@ -136,7 +108,9 @@ public class MeteredWindowStoreTest {
 
         store.init(context, store);
         store.put("a", "a");
-        assertTrue(latencyRecorded.contains("put"));
+        final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "put-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "put-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
         EasyMock.verify(innerStoreMock);
     }
 
@@ -147,7 +121,9 @@ public class MeteredWindowStoreTest {
 
         store.init(context, store);
         store.fetch("a", 1, 1).close(); // recorded on close;
-        assertTrue(latencyRecorded.contains("fetch"));
+        final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
         EasyMock.verify(innerStoreMock);
     }
 
@@ -158,7 +134,9 @@ public class MeteredWindowStoreTest {
 
         store.init(context, store);
         store.fetch("a", "b", 1, 1).close(); // recorded on close;
-        assertTrue(latencyRecorded.contains("fetch"));
+        final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "fetch-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
         EasyMock.verify(innerStoreMock);
     }
 
@@ -171,7 +149,9 @@ public class MeteredWindowStoreTest {
 
         store.init(context, store);
         store.flush();
-        assertTrue(latencyRecorded.contains("flush"));
+        final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "flush-total", "stream-scope-metrics", singletonMap("scope-id", "all")).metricValue());
+        assertEquals(1.0, getMetricByNameFilterByTags(metrics, "flush-total", "stream-scope-metrics", singletonMap("scope-id", "mocked-store")).metricValue());
         EasyMock.verify(innerStoreMock);
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 1bc3423..6b410dc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
@@ -31,6 +33,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.test.StreamsTestUtils.getMetricByNameFilterByTags;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -73,33 +76,25 @@ public class NamedCacheTest {
         }
     }
 
-    private void testSpecificMetrics(final String groupName, final String entityName, final String opName,
-                                     final Map<String, String> metricTags) {
-        assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(opName + "-avg",
-                groupName, "The average cache hit ratio of " + entityName, metricTags)));
-        assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(opName + "-min",
-                groupName, "The minimum cache hit ratio of " + entityName, metricTags)));
-        assertNotNull(streamMetrics.registry().metrics().get(streamMetrics.registry().metricName(opName + "-max",
-                groupName, "The maximum cache hit ratio of " + entityName, metricTags)));
-    }
     @Test
     public void testMetrics() {
-        final String scope = "record-cache";
-        final String entityName = cache.name();
-        final String opName = "hitRatio";
-        final String tagKey = "record-cache-id";
-        final String tagValue = underlyingStoreName;
-        final String groupName = "stream-" + scope + "-metrics";
         final Map<String, String> metricTags = new LinkedHashMap<>();
-        metricTags.put(tagKey, tagValue);
+        metricTags.put("record-cache-id", underlyingStoreName);
         metricTags.put("task-id", taskIDString);
+        metricTags.put("client-id", "test");
 
-        assertNotNull(streamMetrics.registry().getSensor(opName));
-        testSpecificMetrics(groupName, entityName, opName, metricTags);
+        assertNotNull(streamMetrics.registry().getSensor("hitRatio"));
+        final Map<MetricName, KafkaMetric> metrics1 = streamMetrics.registry().metrics();
+        getMetricByNameFilterByTags(metrics1, "hitRatio-avg", "stream-record-cache-metrics", metricTags);
+        getMetricByNameFilterByTags(metrics1, "hitRatio-min", "stream-record-cache-metrics", metricTags);
+        getMetricByNameFilterByTags(metrics1, "hitRatio-max", "stream-record-cache-metrics", metricTags);
 
         // test "all"
-        metricTags.put(tagKey, "all");
-        testSpecificMetrics(groupName, entityName, opName, metricTags);
+        metricTags.put("record-cache-id", "all");
+        final Map<MetricName, KafkaMetric> metrics = streamMetrics.registry().metrics();
+        getMetricByNameFilterByTags(metrics, "hitRatio-avg", "stream-record-cache-metrics", metricTags);
+        getMetricByNameFilterByTags(metrics, "hitRatio-min", "stream-record-cache-metrics", metricTags);
+        getMetricByNameFilterByTags(metrics, "hitRatio-max", "stream-record-cache-metrics", metricTags);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 554169e..bf556ad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -79,7 +79,13 @@ public class RocksDBWindowStoreTest {
     private final ThreadCache cache = new ThreadCache(new LogContext("TestCache "), DEFAULT_CACHE_SIZE_BYTES, new MockStreamsMetrics(new Metrics()));
 
     private final Producer<byte[], byte[]> producer = new MockProducer<>(true, Serdes.ByteArray().serializer(), Serdes.ByteArray().serializer());
-    private final RecordCollector recordCollector = new RecordCollectorImpl(producer, "RocksDBWindowStoreTestTask", new LogContext("RocksDBWindowStoreTestTask "), new DefaultProductionExceptionHandler()) {
+    private final RecordCollector recordCollector = new RecordCollectorImpl(
+        producer,
+        "RocksDBWindowStoreTestTask",
+        new LogContext("RocksDBWindowStoreTestTask "),
+        new DefaultProductionExceptionHandler(),
+        new Metrics().sensor("skipped-records")
+    ) {
         @Override
         public <K1, V1> void send(final String topic,
                                   final K1 key,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
index dcc2305..21b5c5c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StoreChangeLoggerTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
@@ -40,7 +41,7 @@ public class StoreChangeLoggerTest {
     private final Map<Integer, String> logged = new HashMap<>();
 
     private final InternalMockProcessorContext context = new InternalMockProcessorContext(StateSerdes.withBuiltinTypes(topic, Integer.class, String.class),
-        new RecordCollectorImpl(null, "StoreChangeLoggerTest", new LogContext("StoreChangeLoggerTest "), new DefaultProductionExceptionHandler()) {
+        new RecordCollectorImpl(null, "StoreChangeLoggerTest", new LogContext("StoreChangeLoggerTest "), new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records")) {
             @Override
             public <K1, V1> void send(final String topic,
                                       final K1 key,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index a9c47b5..dc04536 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -178,6 +178,7 @@ public class StreamThreadStateStoreProviderTest {
                                          final MockClientSupplier clientSupplier,
                                          final ProcessorTopology topology,
                                          final TaskId taskId) {
+        final Metrics metrics = new Metrics();
         return new StreamTask(
             taskId,
             Collections.singletonList(new TopicPartition(topicName, taskId.partition)),
@@ -185,7 +186,7 @@ public class StreamThreadStateStoreProviderTest {
             clientSupplier.consumer,
             new StoreChangelogReader(clientSupplier.restoreConsumer, new MockStateRestoreListener(), new LogContext("test-stream-task ")),
             streamsConfig,
-            new MockStreamsMetrics(new Metrics()),
+            new MockStreamsMetrics(metrics),
             stateDirectory,
             null,
             new MockTime(),
diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 74bb5d1..57e3efb 100644
--- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -31,12 +31,12 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.AbstractProcessorContext;
 import org.apache.kafka.streams.processor.internals.CompositeRestoreListener;
-import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
 import org.apache.kafka.streams.processor.internals.ToInternal;
 import org.apache.kafka.streams.processor.internals.WrappedBatchingStateRestoreCallback;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
@@ -61,9 +61,20 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
     private Serde<?> valSerde;
     private long timestamp = -1L;
 
+    public InternalMockProcessorContext() {
+        this(null,
+            null,
+            null,
+            new StreamsMetricsImpl(new Metrics(), "mock"),
+            new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()),
+            null,
+            null
+        );
+    }
+
     public InternalMockProcessorContext(final File stateDir,
                                         final StreamsConfig config) {
-        this(stateDir, null, null, new Metrics(), config, null, null);
+        this(stateDir, null, null, new StreamsMetricsImpl(new Metrics(), "mock"), config, null, null);
     }
 
     public InternalMockProcessorContext(final StateSerdes<?, ?> serdes,
@@ -74,12 +85,19 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
     public InternalMockProcessorContext(final StateSerdes<?, ?> serdes,
                                         final RecordCollector collector,
                                         final Metrics metrics) {
-        this(null, serdes.keySerde(), serdes.valueSerde(), metrics, new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()), new RecordCollector.Supplier() {
-            @Override
-            public RecordCollector recordCollector() {
-                return collector;
-            }
-        }, null);
+        this(
+            null,
+            serdes.keySerde(),
+            serdes.valueSerde(),
+            new StreamsMetricsImpl(metrics, "mock"),
+            new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()), new RecordCollector.Supplier() {
+                @Override
+                public RecordCollector recordCollector() {
+                    return collector;
+                }
+            },
+            null
+        );
     }
 
     public InternalMockProcessorContext(final File stateDir,
@@ -87,30 +105,37 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
                                         final Serde<?> valSerde,
                                         final RecordCollector collector,
                                         final ThreadCache cache) {
-        this(stateDir, keySerde, valSerde, new Metrics(), new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()), new RecordCollector.Supplier() {
-            @Override
-            public RecordCollector recordCollector() {
-                return collector;
-            }
-        }, cache);
+        this(stateDir,
+            keySerde,
+            valSerde,
+            new StreamsMetricsImpl(new Metrics(), "mock"),
+            new StreamsConfig(StreamsTestUtils.minimalStreamsConfig()),
+            new RecordCollector.Supplier() {
+                @Override
+                public RecordCollector recordCollector() {
+                    return collector;
+                }
+            },
+            cache
+        );
     }
 
-    private InternalMockProcessorContext(final File stateDir,
-                                         final Serde<?> keySerde,
-                                         final Serde<?> valSerde,
-                                         final Metrics metrics,
-                                         final StreamsConfig config,
-                                         final RecordCollector.Supplier collectorSupplier,
-                                         final ThreadCache cache) {
+    public InternalMockProcessorContext(final File stateDir,
+                                        final Serde<?> keySerde,
+                                        final Serde<?> valSerde,
+                                        final StreamsMetricsImpl metrics,
+                                        final StreamsConfig config,
+                                        final RecordCollector.Supplier collectorSupplier,
+                                        final ThreadCache cache) {
         super(new TaskId(0, 0),
-              config,
-              new MockStreamsMetrics(metrics),
-              null,
-              cache);
+            config,
+            metrics,
+            null,
+            cache);
         this.stateDir = stateDir;
         this.keySerde = keySerde;
         this.valSerde = valSerde;
-        this.metrics = metrics;
+        this.metrics = metrics.registry();
         this.recordCollectorSupplier = collectorSupplier;
     }
 
@@ -169,7 +194,8 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple
         return storeMap.get(name);
     }
 
-    @Override public Cancellable schedule(long interval, PunctuationType type, Punctuator callback) {
+    @Override
+    public Cancellable schedule(long interval, PunctuationType type, Punctuator callback) {
         throw new UnsupportedOperationException("schedule() not supported.");
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index eb137db..7313414 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -173,7 +173,7 @@ public class KStreamTestDriver extends ExternalResource {
         final ProcessorNode currNode = sourceNodeByTopicName(topicName);
 
         if (currNode != null) {
-            context.setRecordContext(createRecordContext(context.timestamp()));
+            context.setRecordContext(createRecordContext(topicName, context.timestamp()));
             context.setCurrentNode(currNode);
             try {
                 context.forward(key, value);
@@ -203,7 +203,7 @@ public class KStreamTestDriver extends ExternalResource {
         final ProcessorNode prevNode = context.currentNode();
         for (final ProcessorNode processor : topology.processors()) {
             if (processor.processor() != null) {
-                context.setRecordContext(createRecordContext(timestamp));
+                context.setRecordContext(createRecordContext(context.topic(), timestamp));
                 context.setCurrentNode(processor);
                 try {
                     processor.processor().punctuate(timestamp);
@@ -277,13 +277,13 @@ public class KStreamTestDriver extends ExternalResource {
         }
     }
 
-    private ProcessorRecordContext createRecordContext(final long timestamp) {
-        return new ProcessorRecordContext(timestamp, -1, -1, "topic");
+    private ProcessorRecordContext createRecordContext(final String topicName, final long timestamp) {
+        return new ProcessorRecordContext(timestamp, -1, -1, topicName);
     }
 
     private class MockRecordCollector extends RecordCollectorImpl {
         MockRecordCollector() {
-            super(null, "KStreamTestDriver", new LogContext("KStreamTestDriver "), new DefaultProductionExceptionHandler());
+            super(null, "KStreamTestDriver", new LogContext("KStreamTestDriver "), new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
         }
 
         @Override
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
index 38b0e7d..a526bfd 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorNode.java
@@ -16,9 +16,9 @@
  */
 package org.apache.kafka.test;
 
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 
 import java.util.Collections;
@@ -49,7 +49,7 @@ public class MockProcessorNode<K, V> extends ProcessorNode<K, V> {
     }
 
     @Override
-    public void init(final ProcessorContext context) {
+    public void init(final InternalProcessorContext context) {
         super.init(context);
         initialized = true;
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
index 8df4e6c..1644daa 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockSourceNode.java
@@ -18,7 +18,7 @@ package org.apache.kafka.test;
 
 
 import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 
 import java.util.ArrayList;
@@ -48,7 +48,7 @@ public class MockSourceNode<K, V> extends SourceNode<K, V> {
     }
 
     @Override
-    public void init(final ProcessorContext context) {
+    public void init(final InternalProcessorContext context) {
         super.init(context);
         initialized = true;
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index fecf7e4..afd2bb2 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -33,7 +33,6 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
 import org.apache.kafka.streams.processor.StateStore;
@@ -50,6 +49,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
 import org.apache.kafka.streams.processor.internals.StreamTask;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
@@ -192,7 +192,7 @@ public class ProcessorTopologyTestDriver {
         consumer.assign(offsetsByTopicPartition.keySet());
 
         final StateDirectory stateDirectory = new StateDirectory(config, Time.SYSTEM);
-        final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics());
+        final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(new Metrics());
         final ThreadCache cache = new ThreadCache(new LogContext("mock "), 1024 * 1024, streamsMetrics);
 
         if (globalTopology != null) {
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index 67304e1..a19b55c 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -17,6 +17,8 @@
 package org.apache.kafka.test;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
@@ -25,13 +27,15 @@ import org.apache.kafka.streams.kstream.Windowed;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 
-public class StreamsTestUtils {
+public final class StreamsTestUtils {
+    private StreamsTestUtils() {}
 
     public static Properties getStreamsConfig(final String applicationId,
                                               final String bootstrapServers,
@@ -87,4 +91,63 @@ public class StreamsTestUtils {
         assertThat(actual.key.key(), equalTo(expectedKey.key()));
         assertThat(actual.value, equalTo(expectedValue.getBytes()));
     }
+
+    public static Metric getMetricByName(final Map<MetricName, ? extends Metric> metrics,
+                                         final String name,
+                                         final String group) {
+        Metric metric = null;
+        for (final Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
+            if (entry.getKey().name().equals(name) && entry.getKey().group().equals(group)) {
+                if (metric == null) {
+                    metric = entry.getValue();
+                } else {
+                    throw new IllegalStateException(
+                        "Found two metrics with name=[" + name + "]: \n" +
+                            metric.metricName().toString() +
+                            " AND \n" +
+                            entry.getKey().toString()
+                    );
+                }
+            }
+        }
+        if (metric == null) {
+            throw new IllegalStateException("Didn't find metric with name=[" + name + "]");
+        } else {
+            return metric;
+        }
+    }
+
+    public static Metric getMetricByNameFilterByTags(final Map<MetricName, ? extends Metric> metrics,
+                                                     final String name,
+                                                     final String group,
+                                                     final Map<String, String> filterTags) {
+        Metric metric = null;
+        for (final Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
+            if (entry.getKey().name().equals(name) && entry.getKey().group().equals(group)) {
+                boolean filtersMatch = true;
+                for (final Map.Entry<String, String> filter : filterTags.entrySet()) {
+                    if (!filter.getValue().equals(entry.getKey().tags().get(filter.getKey()))) {
+                        filtersMatch = false;
+                    }
+                }
+                if (filtersMatch) {
+                    if (metric == null) {
+                        metric = entry.getValue();
+                    } else {
+                        throw new IllegalStateException(
+                            "Found two metrics with name=[" + name + "] and tags=[" + filterTags + "]: \n" +
+                                metric.metricName().toString() +
+                                " AND \n" +
+                                entry.getKey().toString()
+                        );
+                    }
+                }
+            }
+        }
+        if (metric == null) {
+            throw new IllegalStateException("Didn't find metric with name=[" + name + "] and tags=[" + filterTags + "]");
+        } else {
+            return metric;
+        }
+    }
 }
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index c03bf1a..bde70b4 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -23,6 +23,8 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability;
@@ -53,7 +55,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
 import org.apache.kafka.streams.processor.internals.StreamTask;
-import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.WindowStore;
@@ -178,8 +180,9 @@ public class TopologyTestDriver implements Closeable {
     private final GlobalStateManager globalStateManager;
 
     private final StateDirectory stateDirectory;
+    private final Metrics metrics;
     private final ProcessorTopology processorTopology;
-    
+
     private final MockProducer<byte[], byte[]> producer;
 
     private final Set<String> internalTopics = new HashSet<>();
@@ -232,10 +235,11 @@ public class TopologyTestDriver implements Closeable {
 
         final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
         stateDirectory = new StateDirectory(streamsConfig, mockTime);
-        final StreamsMetrics streamsMetrics = new StreamsMetricsImpl(
-            new Metrics(),
-            "topology-test-driver-stream-metrics",
-            Collections.<String, String>emptyMap());
+        metrics = new Metrics();
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
+            metrics,
+            "topology-test-driver-virtual-thread"
+        );
         final ThreadCache cache = new ThreadCache(
             new LogContext("topology-test-driver "),
             Math.max(0, streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG)),
@@ -290,7 +294,8 @@ public class TopologyTestDriver implements Closeable {
                 globalProcessorContext,
                 globalStateManager,
                 new LogAndContinueExceptionHandler(),
-                new LogContext());
+                new LogContext()
+            );
             globalStateTask.initialize();
         } else {
             globalStateManager = null;
@@ -321,6 +326,15 @@ public class TopologyTestDriver implements Closeable {
     }
 
     /**
+     * Get read-only handle on global metrics registry.
+     *
+     * @return Map of all metrics.
+     */
+    public Map<MetricName, ? extends Metric> metrics() {
+        return Collections.unmodifiableMap(metrics.metrics());
+    }
+
+    /**
      * Send an input message with the given key, value, and timestamp on the specified topic to the topology and then
      * commit the messages.
      *
@@ -498,7 +512,7 @@ public class TopologyTestDriver implements Closeable {
         final V value = valueDeserializer.deserialize(record.topic(), record.value());
         return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value);
     }
-    
+
     /**
      * Get all {@link StateStore StateStores} from the topology.
      * The stores can be a "regular" or global stores.
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
index 88801f7..15b2da6 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java
@@ -27,11 +27,10 @@ import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.ValueTransformer;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 
 import java.io.File;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -194,7 +193,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         this.taskId = taskId;
         this.config = streamsConfig;
         this.stateDir = stateDir;
-        this.metrics = new StreamsMetricsImpl(new Metrics(), "mock-processor-context", Collections.<String, String>emptyMap());
+        this.metrics = new StreamsMetricsImpl(new Metrics(), "mock-processor-context-virtual-thread");
     }
 
     @Override

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.