You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/04/19 04:41:39 UTC

[kafka] branch kafka-3729-pr-from-ted created (now 07ef649)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a change to branch kafka-3729-pr-from-ted
in repository https://gitbox.apache.org/repos/asf/kafka.git.


      at 07ef649  Add unit tests

This branch includes the following new commits:

     new 07ef649  Add unit tests

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[kafka] 01/01: Add unit tests

Posted by mj...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch kafka-3729-pr-from-ted
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 07ef6497789c28b19c3ba2e77c08afd57cd5f19b
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Thu Apr 18 21:40:02 2019 -0700

    Add unit tests
---
 .../org/apache/kafka/streams/KafkaStreams.java     |   8 +-
 .../streams/processor/internals/SourceNode.java    |   4 +-
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 137 +++++++++++++++++++++
 .../state/internals/MeteredKeyValueStoreTest.java  |  51 +++++++-
 .../state/internals/MeteredSessionStoreTest.java   |  49 ++++++++
 .../internals/MeteredTimestampWindowStoreTest.java |  77 +++++++++++-
 .../MeteredTimestampedKeyValueStoreTest.java       |  52 +++++++-
 .../state/internals/MeteredWindowStoreTest.java    |  75 ++++++++++-
 8 files changed, 439 insertions(+), 14 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index bb7ca8a..34c8589 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -647,11 +647,11 @@ public class KafkaStreams implements AutoCloseable {
             }
         }
         for (final SourceNode sn : sources) {
-            if (sn.getKeyDeSerializer() != null) {
-                sn.getKeyDeSerializer().configure(config.originals(), true);
+            if (sn.getKeyDeserializer() != null) {
+                sn.getKeyDeserializer().configure(config.originals(), true);
             }
-            if (sn.getValueDeSerializer() != null) {
-                sn.getValueDeSerializer().configure(config.originals(), false);
+            if (sn.getValueDeserializer() != null) {
+                sn.getValueDeserializer().configure(config.originals(), false);
             }
         }
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index bcd6475..f80749b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -52,11 +52,11 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
         this(name, topics, null, keyDeserializer, valDeserializer);
     }
 
-    public Deserializer getKeyDeSerializer() {
+    public Deserializer getKeyDeserializer() {
         return keyDeserializer;
     }
 
-    public Deserializer getValueDeSerializer() {
+    public Deserializer getValueDeserializer() {
         return valDeserializer;
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 6b8b5b5..80e268d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -25,7 +25,10 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.Selectable;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
@@ -33,7 +36,9 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
@@ -74,6 +79,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -662,6 +674,131 @@ public class KafkaStreamsTest {
     }
 
     @SuppressWarnings("unchecked")
+    @Test
+    public void shouldInitializedUserSerdes() {
+        final Deserializer mockSourceKeyDeserialzer = mock(Deserializer.class);
+        mockSourceKeyDeserialzer.configure(anyObject(), eq(true));
+        expectLastCall();
+        final Deserializer mockSourceValueDeserialzer = mock(Deserializer.class);
+        mockSourceValueDeserialzer.configure(anyObject(), eq(false));
+        expectLastCall();
+
+        final Serde mockSourceKeySerde = mock(Serde.class);
+        final Serde mockSourceValueSerde = mock(Serde.class);
+        expect(mockSourceKeySerde.deserializer()).andReturn(mockSourceKeyDeserialzer).anyTimes();
+        expect(mockSourceValueSerde.deserializer()).andReturn(mockSourceValueDeserialzer).anyTimes();
+
+
+
+        final Serializer mockThroughKeySerializer = mock(Serializer.class);
+        mockThroughKeySerializer.configure(anyObject(), eq(true));
+        expectLastCall();
+        final Serializer mockThroughValueSerializer = mock(Serializer.class);
+        mockThroughValueSerializer.configure(anyObject(), eq(false));
+        expectLastCall();
+        final Deserializer mockThroughKeyDeserializer = mock(Deserializer.class);
+        mockThroughKeyDeserializer.configure(anyObject(), eq(true));
+        expectLastCall();
+        final Deserializer mockThroughValueDeserializer = mock(Deserializer.class);
+        mockThroughValueDeserializer.configure(anyObject(), eq(false));
+        expectLastCall();
+
+        final Serde mockThroughKeySerde = mock(Serde.class);
+        final Serde mockThroughValueSerde = mock(Serde.class);
+        expect(mockThroughKeySerde.serializer()).andReturn(mockThroughKeySerializer).anyTimes();
+        expect(mockThroughValueSerde.serializer()).andReturn(mockThroughValueSerializer).anyTimes();
+        expect(mockThroughKeySerde.deserializer()).andReturn(mockThroughKeyDeserializer).anyTimes();
+        expect(mockThroughValueSerde.deserializer()).andReturn(mockThroughValueDeserializer).anyTimes();
+
+
+
+        final Serializer mockGroupedKeySerializer = mock(Serializer.class);
+        mockGroupedKeySerializer.configure(anyObject(), eq(true));
+        expectLastCall();
+        final Serializer mockGroupedValueSerializer = mock(Serializer.class);
+        mockGroupedValueSerializer.configure(anyObject(), eq(false));
+        expectLastCall();
+        final Deserializer mockGroupedKeyDeserializer = mock(Deserializer.class);
+        mockGroupedKeyDeserializer.configure(anyObject(), eq(true));
+        expectLastCall();
+        final Deserializer mockGroupedValueDeserializer = mock(Deserializer.class);
+        mockGroupedValueDeserializer.configure(anyObject(), eq(false));
+        expectLastCall();
+
+        final Serde mockGroupedKeySerde = mock(Serde.class);
+        final Serde mockGroupedValueSerde = mock(Serde.class);
+        expect(mockGroupedKeySerde.serializer()).andReturn(mockGroupedKeySerializer).anyTimes();
+        expect(mockGroupedValueSerde.serializer()).andReturn(mockGroupedValueSerializer).anyTimes();
+        expect(mockGroupedKeySerde.deserializer()).andReturn(mockGroupedKeyDeserializer).anyTimes();
+        expect(mockGroupedValueSerde.deserializer()).andReturn(mockGroupedValueDeserializer).anyTimes();
+
+
+
+        final Serializer mockOutputKeySerializer = mock(Serializer.class);
+        mockOutputKeySerializer.configure(anyObject(), eq(true));
+        expectLastCall();
+        final Serializer mockOutputValueSerializer = mock(Serializer.class);
+        mockOutputValueSerializer.configure(anyObject(), eq(false));
+        expectLastCall();
+
+        final Serde mockOutputKeySerde = mock(Serde.class);
+        final Serde mockOutputValueSerde = mock(Serde.class);
+        expect(mockOutputKeySerde.serializer()).andReturn(mockOutputKeySerializer).anyTimes();
+        expect(mockOutputValueSerde.serializer()).andReturn(mockOutputValueSerializer).anyTimes();
+
+
+
+        final Deserializer mockGlobalKeyDeserializer = mock(Deserializer.class);
+        mockGlobalKeyDeserializer.configure(anyObject(), eq(true));
+        expectLastCall();
+        final Deserializer mockGlobalValueDeserializer = mock(Deserializer.class);
+        mockGlobalValueDeserializer.configure(anyObject(), eq(false));
+        expectLastCall();
+
+        final Serde mockGlobalKeySerde = mock(Serde.class);
+        final Serde mockGlobalValueSerde = mock(Serde.class);
+        expect(mockGlobalKeySerde.deserializer()).andReturn(mockGlobalKeyDeserializer).anyTimes();
+        expect(mockGlobalValueSerde.deserializer()).andReturn(mockGlobalValueDeserializer).anyTimes();
+
+
+
+        builder
+            .stream("anyTopic", Consumed.with(mockSourceKeySerde, mockSourceValueSerde))
+            .through("anyOtherTopic", Produced.with(mockThroughKeySerde, mockThroughValueSerde))
+            .selectKey(KeyValue::pair)
+            .groupByKey(Grouped.with(mockGroupedKeySerde, mockGroupedValueSerde))
+            .count()
+            .toStream()
+            .to("anyOutput", Produced.with(mockOutputKeySerde, mockOutputValueSerde));
+        builder.globalTable("anyGlobal", Consumed.with(mockGlobalKeySerde, mockGlobalValueSerde));
+
+        replay(
+            mockSourceKeyDeserialzer, mockSourceValueDeserialzer, mockSourceKeySerde, mockSourceValueSerde,
+            mockThroughKeySerializer, mockThroughKeyDeserializer, mockThroughKeySerde,
+            mockThroughValueSerializer, mockThroughValueDeserializer, mockThroughValueSerde,
+            mockGroupedKeySerializer, mockGroupedKeyDeserializer, mockGroupedKeySerde,
+            mockGroupedValueSerializer, mockGroupedValueDeserializer, mockGroupedValueSerde,
+            mockOutputKeySerializer, mockOutputValueSerializer, mockOutputKeySerde, mockOutputValueSerde,
+            mockGlobalKeyDeserializer, mockGlobalValueDeserializer, mockGlobalKeySerde, mockGlobalValueSerde);
+
+        KafkaStreams kafkaStreams = null;
+        try {
+            kafkaStreams = new KafkaStreams(builder.build(), props);
+        } finally {
+            if (kafkaStreams != null) {
+                kafkaStreams.close();
+            }
+        }
+
+        verify(
+            mockSourceKeyDeserialzer, mockSourceValueDeserialzer,
+            mockThroughKeySerializer, mockThroughValueSerializer, mockThroughKeyDeserializer, mockThroughValueDeserializer,
+            mockGroupedKeySerializer, mockGroupedValueSerializer, mockGroupedKeyDeserializer, mockGroupedValueDeserializer,
+            mockOutputKeySerializer, mockOutputValueSerializer,
+            mockGlobalKeyDeserializer, mockGlobalValueDeserializer);
+    }
+
+    @SuppressWarnings("unchecked")
     private Topology getStatefulTopology(final String inputTopic,
                                          final String outputTopic,
                                          final String globalTopicName,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 5ef7ce7..07ea6da 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
@@ -39,8 +40,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import java.util.Collections;
-import java.util.List;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -52,6 +53,7 @@ import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.mock;
 import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -101,6 +103,53 @@ public class MeteredKeyValueStoreTest {
         metered.init(context, metered);
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldGetSerdesFromConfigIfNoUserSerdes() {
+        metered = new MeteredKeyValueStore<>(
+            inner,
+            "scope",
+            new MockTime(),
+            null,
+            null
+        );
+        final Serde mockSerde = mock(Serde.class);
+        replay(mockSerde);
+        expect(context.keySerde()).andReturn(mockSerde);
+        expect(context.valueSerde()).andReturn(mockSerde);
+
+        init();
+        verify(context, mockSerde);
+    }
+
+    @Test
+    public void shouldConfigureUserSerdes() {
+        final Serde<String> mockKeySerde = mock(Serde.class);
+        mockKeySerde.configure(anyObject(), eq(true));
+        expectLastCall();
+
+        final Serde<String> mockValueSerde = mock(Serde.class);
+        mockValueSerde.configure(anyObject(), eq(false));
+        expectLastCall();
+
+        replay(mockKeySerde, mockValueSerde);
+
+        metered = new MeteredKeyValueStore<>(
+            inner,
+            "scope",
+            new MockTime(),
+            mockKeySerde,
+            mockValueSerde
+        );
+
+        reset(context);
+        expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)).anyTimes();
+        expect(context.taskId()).andReturn(taskId).anyTimes();
+
+        init();
+        verify(context, mockKeySerde, mockValueSerde);
+    }
+
     @Test
     public void testMetrics() {
         init();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 9a74ed7..72c84be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
@@ -54,6 +55,7 @@ import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.mock;
 import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -100,6 +102,53 @@ public class MeteredSessionStoreTest {
         metered.init(context, metered);
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldGetSerdesFromConfigIfNoUserSerdes() {
+        metered = new MeteredSessionStore<>(
+            inner,
+            "scope",
+            null,
+            null,
+            new MockTime()
+        );
+        final Serde mockSerde = mock(Serde.class);
+        replay(mockSerde);
+        expect(context.keySerde()).andReturn(mockSerde);
+        expect(context.valueSerde()).andReturn(mockSerde);
+
+        init();
+        verify(context, mockSerde);
+    }
+
+    @Test
+    public void shouldConfigureUserSerdes() {
+        final Serde<String> mockKeySerde = mock(Serde.class);
+        mockKeySerde.configure(anyObject(), eq(true));
+        expectLastCall();
+
+        final Serde<String> mockValueSerde = mock(Serde.class);
+        mockValueSerde.configure(anyObject(), eq(false));
+        expectLastCall();
+
+        replay(mockKeySerde, mockValueSerde);
+
+        metered = new MeteredSessionStore<>(
+            inner,
+            "scope",
+            mockKeySerde,
+            mockValueSerde,
+            new MockTime()
+        );
+
+        reset(context);
+        expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)).anyTimes();
+        expect(context.taskId()).andReturn(taskId).anyTimes();
+
+        init();
+        verify(context, mockKeySerde, mockValueSerde);
+    }
+
     @Test
     public void testMetrics() {
         init();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java
index a3522f3..5e9f274 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java
@@ -19,12 +19,14 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
@@ -34,13 +36,19 @@ import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.mock;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertNull;
 
 public class MeteredTimestampWindowStoreTest {
     private InternalMockProcessorContext context;
     @SuppressWarnings("unchecked")
     private final WindowStore<Bytes, byte[]> innerStoreMock = EasyMock.createNiceMock(WindowStore.class);
-    private final MeteredTimestampedWindowStore<String, String> store = new MeteredTimestampedWindowStore<>(
+    private MeteredTimestampedWindowStore<String, String> store = new MeteredTimestampedWindowStore<>(
         innerStoreMock,
         10L, // any size
         "scope",
@@ -70,6 +78,73 @@ public class MeteredTimestampWindowStoreTest {
     }
 
     @Test
+    public void shouldGetSerdesFromConfigIfNoUserSerdes() {
+        store = new MeteredTimestampedWindowStore<>(
+            innerStoreMock,
+            10L, // any size
+            "scope",
+            new MockTime(),
+            null,
+            null
+        );
+
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
+        final Serde mockSerde = mock(Serde.class);
+
+        new InternalMockProcessorContext(
+            TestUtils.tempDirectory(),
+            mockSerde,
+            mockSerde,
+            streamsMetrics,
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+            NoOpRecordCollector::new,
+            new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
+        );
+        replay(mockSerde, innerStoreMock);
+
+        store.init(context, store);
+
+        verify(mockSerde);
+    }
+
+    @Test
+    public void shouldConfigureUserSerdes() {
+        final Serde<String> mockKeySerde = mock(Serde.class);
+        mockKeySerde.configure(anyObject(), eq(true));
+        expectLastCall();
+
+        final Serde<ValueAndTimestamp<String>> mockValueSerde = mock(Serde.class);
+        mockValueSerde.configure(anyObject(), eq(false));
+        expectLastCall();
+
+        store = new MeteredTimestampedWindowStore<>(
+            innerStoreMock,
+            10L, // any size
+            "scope",
+            new MockTime(),
+            mockKeySerde,
+            mockValueSerde
+        );
+
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
+        final Serde mockSerde = mock(Serde.class);
+
+        new InternalMockProcessorContext(
+            TestUtils.tempDirectory(),
+            mockSerde,
+            mockSerde,
+            streamsMetrics,
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+            NoOpRecordCollector::new,
+            new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
+        );
+        replay(mockKeySerde, mockValueSerde, innerStoreMock);
+
+        store.init(context, store);
+        verify(mockKeySerde, mockValueSerde);
+    }
+
+    @Test
     public void shouldCloseUnderlyingStore() {
         innerStoreMock.close();
         EasyMock.expectLastCall();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 587d369..a17b03a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
@@ -40,8 +41,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import java.util.Collections;
-import java.util.List;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -53,6 +54,7 @@ import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.mock;
 import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -76,7 +78,6 @@ public class MeteredTimestampedKeyValueStoreTest {
     private MeteredTimestampedKeyValueStore<String, String> metered;
     private final String key = "key";
     private final Bytes keyBytes = Bytes.wrap(key.getBytes());
-    private final String value = "value";
     private final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make("value", 97L);
     // timestamp is 97 what is ASCII of 'a'
     private final byte[] valueAndTimestampBytes = "\0\0\0\0\0\0\0avalue".getBytes();
@@ -104,6 +105,53 @@ public class MeteredTimestampedKeyValueStoreTest {
         metered.init(context, metered);
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldGetSerdesFromConfigIfNoUserSerdes() {
+        metered = new MeteredTimestampedKeyValueStore<>(
+            inner,
+            "scope",
+            new MockTime(),
+            null,
+            null
+        );
+        final Serde mockSerde = mock(Serde.class);
+        replay(mockSerde);
+        expect(context.keySerde()).andReturn(mockSerde);
+        expect(context.valueSerde()).andReturn(mockSerde);
+
+        init();
+        verify(context, mockSerde);
+    }
+
+    @Test
+    public void shouldConfigureUserSerdes() {
+        final Serde<String> mockKeySerde = mock(Serde.class);
+        mockKeySerde.configure(anyObject(), eq(true));
+        expectLastCall();
+
+        final Serde<ValueAndTimestamp<String>> mockValueSerde = mock(Serde.class);
+        mockValueSerde.configure(anyObject(), eq(false));
+        expectLastCall();
+
+        replay(mockKeySerde, mockValueSerde);
+
+        metered = new MeteredTimestampedKeyValueStore<>(
+            inner,
+            "scope",
+            new MockTime(),
+            mockKeySerde,
+            mockValueSerde
+        );
+
+        reset(context);
+        expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)).anyTimes();
+        expect(context.taskId()).andReturn(taskId).anyTimes();
+
+        init();
+        verify(context, mockKeySerde, mockValueSerde);
+    }
+
     @Test
     public void testMetrics() {
         init();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 962888a..96279b3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -22,12 +22,12 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.InternalMockProcessorContext;
@@ -59,7 +59,7 @@ public class MeteredWindowStoreTest {
     private InternalMockProcessorContext context;
     @SuppressWarnings("unchecked")
     private final WindowStore<Bytes, byte[]> innerStoreMock = createNiceMock(WindowStore.class);
-    private final MeteredWindowStore<String, String> store = new MeteredWindowStore<>(
+    private MeteredWindowStore<String, String> store = new MeteredWindowStore<>(
         innerStoreMock,
         10L, // any size
         "scope",
@@ -89,6 +89,73 @@ public class MeteredWindowStoreTest {
     }
 
     @Test
+    public void shouldGetSerdesFromConfigIfNoUserSerdes() {
+        store = new MeteredWindowStore<>(
+            innerStoreMock,
+            10L, // any size
+            "scope",
+            new MockTime(),
+            null,
+            null
+        );
+
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
+        final Serde mockSerde = mock(Serde.class);
+
+        new InternalMockProcessorContext(
+            TestUtils.tempDirectory(),
+            mockSerde,
+            mockSerde,
+            streamsMetrics,
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+            NoOpRecordCollector::new,
+            new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
+        );
+        replay(mockSerde, innerStoreMock);
+
+        store.init(context, store);
+
+        verify(mockSerde);
+    }
+
+    @Test
+    public void shouldConfigureUserSerdes() {
+        final Serde<String> mockKeySerde = mock(Serde.class);
+        mockKeySerde.configure(anyObject(), eq(true));
+        expectLastCall();
+
+        final Serde<String> mockValueSerde = mock(Serde.class);
+        mockValueSerde.configure(anyObject(), eq(false));
+        expectLastCall();
+
+        store = new MeteredWindowStore<>(
+            innerStoreMock,
+            10L, // any size
+            "scope",
+            new MockTime(),
+            mockKeySerde,
+            mockValueSerde
+        );
+
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
+        final Serde mockSerde = mock(Serde.class);
+
+        new InternalMockProcessorContext(
+            TestUtils.tempDirectory(),
+            mockSerde,
+            mockSerde,
+            streamsMetrics,
+            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
+            NoOpRecordCollector::new,
+            new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
+        );
+        replay(mockKeySerde, mockValueSerde, innerStoreMock);
+
+        store.init(context, store);
+        verify(mockKeySerde, mockValueSerde);
+    }
+
+    @Test
     public void testMetrics() {
         replay(innerStoreMock);
         store.init(context, store);
@@ -128,7 +195,7 @@ public class MeteredWindowStoreTest {
 
     @Test
     public void shouldRecordFetchLatency() {
-        expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1)).andReturn(KeyValueIterators.<byte[]>emptyWindowStoreIterator());
+        expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1)).andReturn(KeyValueIterators.emptyWindowStoreIterator());
         replay(innerStoreMock);
 
         store.init(context, store);
@@ -141,7 +208,7 @@ public class MeteredWindowStoreTest {
 
     @Test
     public void shouldRecordFetchRangeLatency() {
-        expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator());
+        expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)).andReturn(KeyValueIterators.emptyIterator());
         replay(innerStoreMock);
 
         store.init(context, store);