You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/04/22 17:16:03 UTC

[kafka] branch trunk updated: MINOR: Add unit test for SerDe auto-configuration (#6610)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 172fbb2  MINOR: Add unit test for SerDe auto-configuration (#6610)
172fbb2 is described below

commit 172fbb2dd55144e2e44777174f970b56768e1777
Author: Matthias J. Sax <mj...@apache.org>
AuthorDate: Mon Apr 22 10:15:46 2019 -0700

    MINOR: Add unit test for SerDe auto-configuration (#6610)
    
    Reviewers: Guozhang Wang <gu...@confluent.io>, Ted Yu <yu...@gmail.com>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |   8 +-
 .../streams/processor/internals/SourceNode.java    |   4 +-
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 137 +++++++++++++++++++++
 .../state/internals/MeteredKeyValueStoreTest.java  |  51 +++++++-
 .../state/internals/MeteredSessionStoreTest.java   |  49 ++++++++
 .../internals/MeteredTimestampWindowStoreTest.java |  77 +++++++++++-
 .../MeteredTimestampedKeyValueStoreTest.java       |  52 +++++++-
 .../state/internals/MeteredWindowStoreTest.java    |  75 ++++++++++-
 8 files changed, 439 insertions(+), 14 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index e201bcd..d8bb0c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -647,11 +647,11 @@ public class KafkaStreams implements AutoCloseable {
             }
         }
         for (final SourceNode sn : sources) {
-            if (sn.getKeyDeSerializer() != null) {
-                sn.getKeyDeSerializer().configure(config.originals(), true);
+            if (sn.getKeyDeserializer() != null) {
+                sn.getKeyDeserializer().configure(config.originals(), true);
             }
-            if (sn.getValueDeSerializer() != null) {
-                sn.getValueDeSerializer().configure(config.originals(), false);
+            if (sn.getValueDeserializer() != null) {
+                sn.getValueDeserializer().configure(config.originals(), false);
             }
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index bcd6475..f80749b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -52,11 +52,11 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
         this(name, topics, null, keyDeserializer, valDeserializer);
     }
 
-    public Deserializer getKeyDeSerializer() {
+    public Deserializer getKeyDeserializer() {
         return keyDeserializer;
     }
 
-    public Deserializer getValueDeSerializer() {
+    public Deserializer getValueDeserializer() {
         return valDeserializer;
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 3e55f29..9ce7c84 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -25,7 +25,10 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
@@ -33,7 +36,9 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
@@ -74,6 +79,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -726,6 +738,131 @@ public class KafkaStreamsTest {
     }
 
     @SuppressWarnings("unchecked")
+    @Test
+    public void shouldInitializeUserSerdes() {
+        final Deserializer mockSourceKeyDeserialzer = mock(Deserializer.class);
+        mockSourceKeyDeserialzer.configure(anyObject(), eq(true));
+        expectLastCall();
+        final Deserializer mockSourceValueDeserialzer = mock(Deserializer.class);
+        mockSourceValueDeserialzer.configure(anyObject(), eq(false));
+        expectLastCall();
+
+        final Serde mockSourceKeySerde = mock(Serde.class);
+        final Serde mockSourceValueSerde = mock(Serde.class);
+        expect(mockSourceKeySerde.deserializer()).andReturn(mockSourceKeyDeserialzer).anyTimes();
+        expect(mockSourceValueSerde.deserializer()).andReturn(mockSourceValueDeserialzer).anyTimes();
+
+
+
+        final Serializer mockThroughKeySerializer = mock(Serializer.class);
+        mockThroughKeySerializer.configure(anyObject(), eq(true));
+        expectLastCall();
+        final Serializer mockThroughValueSerializer = mock(Serializer.class);
+        mockThroughValueSerializer.configure(anyObject(), eq(false));
+        expectLastCall();
+        final Deserializer mockThroughKeyDeserializer = mock(Deserializer.class);
+        mockThroughKeyDeserializer.configure(anyObject(), eq(true));
+        expectLastCall();
+        final Deserializer mockThroughValueDeserializer = mock(Deserializer.class);
+        mockThroughValueDeserializer.configure(anyObject(), eq(false));
+        expectLastCall();
+
+        final Serde mockThroughKeySerde = mock(Serde.class);
+        final Serde mockThroughValueSerde = mock(Serde.class);
+        expect(mockThroughKeySerde.serializer()).andReturn(mockThroughKeySerializer).anyTimes();
+        expect(mockThroughValueSerde.serializer()).andReturn(mockThroughValueSerializer).anyTimes();
+        expect(mockThroughKeySerde.deserializer()).andReturn(mockThroughKeyDeserializer).anyTimes();
+        expect(mockThroughValueSerde.deserializer()).andReturn(mockThroughValueDeserializer).anyTimes();
+
+
+
+        final Serializer mockGroupedKeySerializer = mock(Serializer.class);
+        mockGroupedKeySerializer.configure(anyObject(), eq(true));
+        expectLastCall();
+        final Serializer mockGroupedValueSerializer = mock(Serializer.class);
+        mockGroupedValueSerializer.configure(anyObject(), eq(false));
+        expectLastCall();
+        final Deserializer mockGroupedKeyDeserializer = mock(Deserializer.class);
+        mockGroupedKeyDeserializer.configure(anyObject(), eq(true));
+        expectLastCall();
+        final Deserializer mockGroupedValueDeserializer = mock(Deserializer.class);
+        mockGroupedValueDeserializer.configure(anyObject(), eq(false));
+        expectLastCall();
+
+        final Serde mockGroupedKeySerde = mock(Serde.class);
+        final Serde mockGroupedValueSerde = mock(Serde.class);
+        expect(mockGroupedKeySerde.serializer()).andReturn(mockGroupedKeySerializer).anyTimes();
+        expect(mockGroupedValueSerde.serializer()).andReturn(mockGroupedValueSerializer).anyTimes();
+        expect(mockGroupedKeySerde.deserializer()).andReturn(mockGroupedKeyDeserializer).anyTimes();
+        expect(mockGroupedValueSerde.deserializer()).andReturn(mockGroupedValueDeserializer).anyTimes();
+
+
+
+        final Serializer mockOutputKeySerializer = mock(Serializer.class);
+        mockOutputKeySerializer.configure(anyObject(), eq(true));
+        expectLastCall();
+        final Serializer mockOutputValueSerializer = mock(Serializer.class);
+        mockOutputValueSerializer.configure(anyObject(), eq(false));
+        expectLastCall();
+
+        final Serde mockOutputKeySerde = mock(Serde.class);
+        final Serde mockOutputValueSerde = mock(Serde.class);
+        expect(mockOutputKeySerde.serializer()).andReturn(mockOutputKeySerializer).anyTimes();
+        expect(mockOutputValueSerde.serializer()).andReturn(mockOutputValueSerializer).anyTimes();
+
+
+
+        final Deserializer mockGlobalKeyDeserializer = mock(Deserializer.class);
+        mockGlobalKeyDeserializer.configure(anyObject(), eq(true));
+        expectLastCall();
+        final Deserializer mockGlobalValueDeserializer = mock(Deserializer.class);
+        mockGlobalValueDeserializer.configure(anyObject(), eq(false));
+        expectLastCall();
+
+        final Serde mockGlobalKeySerde = mock(Serde.class);
+        final Serde mockGlobalValueSerde = mock(Serde.class);
+        expect(mockGlobalKeySerde.deserializer()).andReturn(mockGlobalKeyDeserializer).anyTimes();
+        expect(mockGlobalValueSerde.deserializer()).andReturn(mockGlobalValueDeserializer).anyTimes();
+
+
+
+        builder
+            .stream("anyTopic", Consumed.with(mockSourceKeySerde, mockSourceValueSerde))
+            .through("anyOtherTopic", Produced.with(mockThroughKeySerde, mockThroughValueSerde))
+            .selectKey(KeyValue::pair)
+            .groupByKey(Grouped.with(mockGroupedKeySerde, mockGroupedValueSerde))
+            .count()
+            .toStream()
+            .to("anyOutput", Produced.with(mockOutputKeySerde, mockOutputValueSerde));
+        builder.globalTable("anyGlobal", Consumed.with(mockGlobalKeySerde, mockGlobalValueSerde));
+
+        replay(
+            mockSourceKeyDeserialzer, mockSourceValueDeserialzer, mockSourceKeySerde, mockSourceValueSerde,
+            mockThroughKeySerializer, mockThroughKeyDeserializer, mockThroughKeySerde,
+            mockThroughValueSerializer, mockThroughValueDeserializer, mockThroughValueSerde,
+            mockGroupedKeySerializer, mockGroupedKeyDeserializer, mockGroupedKeySerde,
+            mockGroupedValueSerializer, mockGroupedValueDeserializer, mockGroupedValueSerde,
+            mockOutputKeySerializer, mockOutputValueSerializer, mockOutputKeySerde, mockOutputValueSerde,
+            mockGlobalKeyDeserializer, mockGlobalValueDeserializer, mockGlobalKeySerde, mockGlobalValueSerde);
+
+        KafkaStreams kafkaStreams = null;
+        try {
+            kafkaStreams = new KafkaStreams(builder.build(), props);
+        } finally {
+            if (kafkaStreams != null) {
+                kafkaStreams.close();
+            }
+        }
+
+        verify(
+            mockSourceKeyDeserialzer, mockSourceValueDeserialzer,
+            mockThroughKeySerializer, mockThroughValueSerializer, mockThroughKeyDeserializer, mockThroughValueDeserializer,
+            mockGroupedKeySerializer, mockGroupedValueSerializer, mockGroupedKeyDeserializer, mockGroupedValueDeserializer,
+            mockOutputKeySerializer, mockOutputValueSerializer,
+            mockGlobalKeyDeserializer, mockGlobalValueDeserializer);
+    }
+
+    @SuppressWarnings("unchecked")
     private Topology getStatefulTopology(final String inputTopic,
                                          final String outputTopic,
                                          final String globalTopicName,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 51c14ba..bab9d6b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
@@ -39,8 +40,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import java.util.Collections;
-import java.util.List;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -52,6 +53,7 @@ import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.mock;
 import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -102,6 +104,53 @@ public class MeteredKeyValueStoreTest {
         metered.init(context, metered);
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldGetSerdesFromConfigWithoutUserSerdes() {
+        metered = new MeteredKeyValueStore<>(
+            inner,
+            "scope",
+            new MockTime(),
+            null,
+            null
+        );
+        final Serde mockSerde = mock(Serde.class);
+        replay(mockSerde);
+        expect(context.keySerde()).andReturn(mockSerde);
+        expect(context.valueSerde()).andReturn(mockSerde);
+
+        init();
+        verify(context, mockSerde);
+    }
+
+    @Test
+    public void shouldConfigureUserSerdes() {
+        final Serde<String> mockKeySerde = mock(Serde.class);
+        mockKeySerde.configure(anyObject(), eq(true));
+        expectLastCall();
+
+        final Serde<String> mockValueSerde = mock(Serde.class);
+        mockValueSerde.configure(anyObject(), eq(false));
+        expectLastCall();
+
+        replay(mockKeySerde, mockValueSerde);
+
+        metered = new MeteredKeyValueStore<>(
+            inner,
+            "scope",
+            new MockTime(),
+            mockKeySerde,
+            mockValueSerde
+        );
+
+        reset(context);
+        expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)).anyTimes();
+        expect(context.taskId()).andReturn(taskId).anyTimes();
+
+        init();
+        verify(context, mockKeySerde, mockValueSerde);
+    }
+
     @Test
     public void testMetrics() {
         init();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index e5eb9e0..09c3371 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
@@ -54,6 +55,7 @@ import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.mock;
 import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -101,6 +103,53 @@ public class MeteredSessionStoreTest {
         metered.init(context, metered);
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldGetSerdesFromConfigWithoutUserSerdes() {
+        metered = new MeteredSessionStore<>(
+            inner,
+            "scope",
+            null,
+            null,
+            new MockTime()
+        );
+        final Serde mockSerde = mock(Serde.class);
+        replay(mockSerde);
+        expect(context.keySerde()).andReturn(mockSerde);
+        expect(context.valueSerde()).andReturn(mockSerde);
+
+        init();
+        verify(context, mockSerde);
+    }
+
+    @Test
+    public void shouldConfigureUserSerdes() {
+        final Serde<String> mockKeySerde = mock(Serde.class);
+        mockKeySerde.configure(anyObject(), eq(true));
+        expectLastCall();
+
+        final Serde<String> mockValueSerde = mock(Serde.class);
+        mockValueSerde.configure(anyObject(), eq(false));
+        expectLastCall();
+
+        replay(mockKeySerde, mockValueSerde);
+
+        metered = new MeteredSessionStore<>(
+            inner,
+            "scope",
+            mockKeySerde,
+            mockValueSerde,
+            new MockTime()
+        );
+
+        reset(context);
+        expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)).anyTimes();
+        expect(context.taskId()).andReturn(taskId).anyTimes();
+
+        init();
+        verify(context, mockKeySerde, mockValueSerde);
+    }
+
     @Test
     public void testMetrics() {
         init();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java
index a3522f3..7cb9148 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java
@@ -19,12 +19,14 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
@@ -34,13 +36,19 @@ import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertNull;
 
 public class MeteredTimestampWindowStoreTest {
     private InternalMockProcessorContext context;
     @SuppressWarnings("unchecked")
     private final WindowStore<Bytes, byte[]> innerStoreMock = EasyMock.createNiceMock(WindowStore.class);
-    private final MeteredTimestampedWindowStore<String, String> store = new MeteredTimestampedWindowStore<>(
+    private MeteredTimestampedWindowStore<String, String> store = new MeteredTimestampedWindowStore<>(
         innerStoreMock,
         10L, // any size
         "scope",
@@ -70,6 +78,73 @@ public class MeteredTimestampWindowStoreTest {
     }
 
     @Test
+    public void shouldGetSerdesFromConfigWithoutUserSerdes() {
+        store = new MeteredTimestampedWindowStore<>(
+            innerStoreMock,
+            10L, // any size
+            "scope",
+            new MockTime(),
+            null,
+            null
+        );
+
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
+        final Serde mockSerde = mock(Serde.class);
+
+        new InternalMockProcessorContext(
+            TestUtils.tempDirectory(),
+            mockSerde,
+            mockSerde,
+            streamsMetrics,
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+            NoOpRecordCollector::new,
+            new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
+        );
+        replay(mockSerde, innerStoreMock);
+
+        store.init(context, store);
+
+        verify(mockSerde);
+    }
+
+    @Test
+    public void shouldConfigureUserSerdes() {
+        final Serde<String> mockKeySerde = mock(Serde.class);
+        mockKeySerde.configure(anyObject(), eq(true));
+        expectLastCall();
+
+        final Serde<ValueAndTimestamp<String>> mockValueSerde = mock(Serde.class);
+        mockValueSerde.configure(anyObject(), eq(false));
+        expectLastCall();
+
+        store = new MeteredTimestampedWindowStore<>(
+            innerStoreMock,
+            10L, // any size
+            "scope",
+            new MockTime(),
+            mockKeySerde,
+            mockValueSerde
+        );
+
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
+        final Serde mockSerde = mock(Serde.class);
+
+        new InternalMockProcessorContext(
+            TestUtils.tempDirectory(),
+            mockSerde,
+            mockSerde,
+            streamsMetrics,
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+            NoOpRecordCollector::new,
+            new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
+        );
+        replay(mockKeySerde, mockValueSerde, innerStoreMock);
+
+        store.init(context, store);
+        verify(mockKeySerde, mockValueSerde);
+    }
+
+    @Test
     public void shouldCloseUnderlyingStore() {
         innerStoreMock.close();
         EasyMock.expectLastCall();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 587d369..858b399 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
@@ -40,8 +41,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import java.util.Collections;
-import java.util.List;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -53,6 +54,7 @@ import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.mock;
 import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -76,7 +78,6 @@ public class MeteredTimestampedKeyValueStoreTest {
     private MeteredTimestampedKeyValueStore<String, String> metered;
     private final String key = "key";
     private final Bytes keyBytes = Bytes.wrap(key.getBytes());
-    private final String value = "value";
     private final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make("value", 97L);
     // timestamp is 97 what is ASCII of 'a'
     private final byte[] valueAndTimestampBytes = "\0\0\0\0\0\0\0avalue".getBytes();
@@ -104,6 +105,53 @@ public class MeteredTimestampedKeyValueStoreTest {
         metered.init(context, metered);
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldGetSerdesFromConfigWithoutUserSerdes() {
+        metered = new MeteredTimestampedKeyValueStore<>(
+            inner,
+            "scope",
+            new MockTime(),
+            null,
+            null
+        );
+        final Serde mockSerde = mock(Serde.class);
+        replay(mockSerde);
+        expect(context.keySerde()).andReturn(mockSerde);
+        expect(context.valueSerde()).andReturn(mockSerde);
+
+        init();
+        verify(context, mockSerde);
+    }
+
+    @Test
+    public void shouldConfigureUserSerdes() {
+        final Serde<String> mockKeySerde = mock(Serde.class);
+        mockKeySerde.configure(anyObject(), eq(true));
+        expectLastCall();
+
+        final Serde<ValueAndTimestamp<String>> mockValueSerde = mock(Serde.class);
+        mockValueSerde.configure(anyObject(), eq(false));
+        expectLastCall();
+
+        replay(mockKeySerde, mockValueSerde);
+
+        metered = new MeteredTimestampedKeyValueStore<>(
+            inner,
+            "scope",
+            new MockTime(),
+            mockKeySerde,
+            mockValueSerde
+        );
+
+        reset(context);
+        expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)).anyTimes();
+        expect(context.taskId()).andReturn(taskId).anyTimes();
+
+        init();
+        verify(context, mockKeySerde, mockValueSerde);
+    }
+
     @Test
     public void testMetrics() {
         init();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index c0ed7f6..749fd10 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -22,12 +22,12 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.InternalMockProcessorContext;
@@ -59,7 +59,7 @@ public class MeteredWindowStoreTest {
     private InternalMockProcessorContext context;
     @SuppressWarnings("unchecked")
     private final WindowStore<Bytes, byte[]> innerStoreMock = createNiceMock(WindowStore.class);
-    private final MeteredWindowStore<String, String> store = new MeteredWindowStore<>(
+    private MeteredWindowStore<String, String> store = new MeteredWindowStore<>(
         innerStoreMock,
         10L, // any size
         "scope",
@@ -89,6 +89,73 @@ public class MeteredWindowStoreTest {
     }
 
     @Test
+    public void shouldGetSerdesFromConfigWithoutUserSerdes() {
+        store = new MeteredWindowStore<>(
+            innerStoreMock,
+            10L, // any size
+            "scope",
+            new MockTime(),
+            null,
+            null
+        );
+
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
+        final Serde mockSerde = mock(Serde.class);
+
+        new InternalMockProcessorContext(
+            TestUtils.tempDirectory(),
+            mockSerde,
+            mockSerde,
+            streamsMetrics,
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+            NoOpRecordCollector::new,
+            new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
+        );
+        replay(mockSerde, innerStoreMock);
+
+        store.init(context, store);
+
+        verify(mockSerde);
+    }
+
+    @Test
+    public void shouldConfigureUserSerdes() {
+        final Serde<String> mockKeySerde = mock(Serde.class);
+        mockKeySerde.configure(anyObject(), eq(true));
+        expectLastCall();
+
+        final Serde<String> mockValueSerde = mock(Serde.class);
+        mockValueSerde.configure(anyObject(), eq(false));
+        expectLastCall();
+
+        store = new MeteredWindowStore<>(
+            innerStoreMock,
+            10L, // any size
+            "scope",
+            new MockTime(),
+            mockKeySerde,
+            mockValueSerde
+        );
+
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
+        final Serde mockSerde = mock(Serde.class);
+
+        new InternalMockProcessorContext(
+            TestUtils.tempDirectory(),
+            mockSerde,
+            mockSerde,
+            streamsMetrics,
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+            NoOpRecordCollector::new,
+            new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
+        );
+        replay(mockKeySerde, mockValueSerde, innerStoreMock);
+
+        store.init(context, store);
+        verify(mockKeySerde, mockValueSerde);
+    }
+
+    @Test
     public void testMetrics() {
         replay(innerStoreMock);
         store.init(context, store);
@@ -128,7 +195,7 @@ public class MeteredWindowStoreTest {
 
     @Test
     public void shouldRecordFetchLatency() {
-        expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1)).andReturn(KeyValueIterators.<byte[]>emptyWindowStoreIterator());
+        expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1)).andReturn(KeyValueIterators.emptyWindowStoreIterator());
         replay(innerStoreMock);
 
         store.init(context, store);
@@ -141,7 +208,7 @@ public class MeteredWindowStoreTest {
 
     @Test
     public void shouldRecordFetchRangeLatency() {
-        expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator());
+        expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)).andReturn(KeyValueIterators.emptyIterator());
         replay(innerStoreMock);
 
         store.init(context, store);