You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2019/05/01 07:34:03 UTC

[kafka] branch trunk updated: KAFKA-3729: Revert adding Serde auto-config (#6630)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 56c6480  KAFKA-3729: Revert adding Serde auto-config (#6630)
56c6480 is described below

commit 56c64803fa190814232c5f71e9a758346c474bb4
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Wed May 1 09:33:53 2019 +0200

    KAFKA-3729: Revert adding Serde auto-config (#6630)
    
    * Revert "MINOR: Add unit test for SerDe auto-configuration (#6610)"
    
    This reverts commit 172fbb2dd55144e2e44777174f970b56768e1777.
    
    * Revert "[KAFKA-3729] Auto-configure non-default SerDes passed alongside the topology builder (#6461)"
    
    This reverts commit e56ebbffca57741d398283e46073ed4170f8f927.
    
    The two merged PRs introduce a breaking change. Reverting to preserve backward compatibility. Jira ticket reopened.
    
    Reviewers: Ted Yu <yu...@gmail.com>, Guozhang Wang <gu...@confluent.io>
---
 docs/streams/developer-guide/datatypes.html        |   2 +-
 docs/streams/developer-guide/dsl-api.html          |   3 +-
 .../org/apache/kafka/streams/KafkaStreams.java     |  26 ----
 .../streams/processor/internals/SinkNode.java      |   8 --
 .../streams/processor/internals/SourceNode.java    |   8 --
 .../state/internals/MeteredKeyValueStore.java      |  19 +--
 .../state/internals/MeteredSessionStore.java       |  19 +--
 .../internals/MeteredTimestampedKeyValueStore.java |  21 +---
 .../internals/MeteredTimestampedWindowStore.java   |  21 +---
 .../state/internals/MeteredWindowStore.java        |  19 +--
 .../org/apache/kafka/streams/KafkaStreamsTest.java | 137 ---------------------
 .../state/internals/MeteredKeyValueStoreTest.java  |  51 --------
 .../state/internals/MeteredSessionStoreTest.java   |  51 --------
 .../internals/MeteredTimestampWindowStoreTest.java |  77 +-----------
 .../MeteredTimestampedKeyValueStoreTest.java       |  52 +-------
 .../state/internals/MeteredWindowStoreTest.java    |  75 +----------
 16 files changed, 23 insertions(+), 566 deletions(-)

diff --git a/docs/streams/developer-guide/datatypes.html b/docs/streams/developer-guide/datatypes.html
index d78202a..83159e8 100644
--- a/docs/streams/developer-guide/datatypes.html
+++ b/docs/streams/developer-guide/datatypes.html
@@ -68,7 +68,7 @@
     </div>
     <div class="section" id="overriding-default-serdes">
       <h2>Overriding default SerDes<a class="headerlink" href="#overriding-default-serdes" title="Permalink to this headline"></a></h2>
-      <p>You can also specify SerDes explicitly by passing them to the appropriate API methods, which overrides the default SerDe settings. For this case, Kafka Streams will auto-configure the passed-in SerDes objects, i.e., you don't need to call <code>configure()</code> manually.</p>
+      <p>You can also specify SerDes explicitly by passing them to the appropriate API methods, which overrides the default serde settings:</p>
       <div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serde</span><span class="o">;</span>
 <span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span>
 
diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html
index 784dcca..f5c3df9 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -3547,8 +3547,7 @@ val clicksPerRegion: KTable[String, Long] =
 // Write the (continuously updating) results to the output topic.
 clicksPerRegion.toStream.to(outputTopic)
               </pre>
-              <p>A complete example of user-defined SerDes can be found in a test class within the library.
-   Kafka Streams will auto-configure the passed-in SerDes objects, i.e., you don't need to call <code>configure()</code> manually.</p>
+              <p>A complete example of user-defined SerDes can be found in a test class within the library.</p>
             </div>
         </div>
 </div>
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index d8bb0c1..315a6bb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -49,8 +49,6 @@ import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
-import org.apache.kafka.streams.processor.internals.SinkNode;
-import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
@@ -636,26 +634,6 @@ public class KafkaStreams implements AutoCloseable {
         this(internalTopologyBuilder, config, clientSupplier, Time.SYSTEM);
     }
 
-    @SuppressWarnings("unchecked")
-    private void configureSerDes(final Set<SinkNode> sinks, final Set<SourceNode> sources) {
-        for (final SinkNode sn : sinks) {
-            if (sn.getKeySerializer() != null) {
-                sn.getKeySerializer().configure(config.originals(), true);
-            }
-            if (sn.getValueSerializer() != null) {
-                sn.getValueSerializer().configure(config.originals(), false);
-            }
-        }
-        for (final SourceNode sn : sources) {
-            if (sn.getKeyDeserializer() != null) {
-                sn.getKeyDeserializer().configure(config.originals(), true);
-            }
-            if (sn.getValueDeserializer() != null) {
-                sn.getValueDeserializer().configure(config.originals(), false);
-            }
-        }
-    }
-
     private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                          final StreamsConfig config,
                          final KafkaClientSupplier clientSupplier,
@@ -692,7 +670,6 @@ public class KafkaStreams implements AutoCloseable {
         // sanity check to fail-fast in case we cannot build a ProcessorTopology due to an exception
         final ProcessorTopology taskTopology = internalTopologyBuilder.build();
 
-        configureSerDes(taskTopology.sinks(), taskTopology.sources());
         streamsMetadataState = new StreamsMetadataState(
                 internalTopologyBuilder,
                 parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
@@ -706,7 +683,6 @@ public class KafkaStreams implements AutoCloseable {
             log.warn("Negative cache size passed in. Reverting to cache size of 0 bytes.");
         }
         final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
-
         final long cacheSizePerThread = totalCacheSize / (threads.length + (globalTaskTopology == null ? 0 : 1));
         final boolean createStateDirectory = taskTopology.hasPersistentLocalStore() ||
                 (globalTaskTopology != null && globalTaskTopology.hasPersistentGlobalStore());
@@ -720,8 +696,6 @@ public class KafkaStreams implements AutoCloseable {
         final StateRestoreListener delegatingStateRestoreListener = new DelegatingStateRestoreListener();
         GlobalStreamThread.State globalThreadState = null;
         if (globalTaskTopology != null) {
-            configureSerDes(globalTaskTopology.sinks(), globalTaskTopology.sources());
-
             final String globalThreadId = clientId + "-GlobalStreamThread";
             globalStreamThread = new GlobalStreamThread(globalTaskTopology,
                                                         config,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 03e16c3..73bffc8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -44,14 +44,6 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
         this.partitioner = partitioner;
     }
 
-    public Serializer getKeySerializer() {
-        return keySerializer;
-    }
-
-    public Serializer getValueSerializer() {
-        return valSerializer;
-    }
-
     /**
      * @throws UnsupportedOperationException if this method adds a child to a sink node
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index f80749b..87505ca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -52,14 +52,6 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
         this(name, topics, null, keyDeserializer, valDeserializer);
     }
 
-    public Deserializer getKeyDeserializer() {
-        return keyDeserializer;
-    }
-
-    public Deserializer getValueDeserializer() {
-        return valDeserializer;
-    }
-
     K deserializeKey(final String topic, final Headers headers, final byte[] data) {
         return keyDeserializer.deserialize(topic, headers, data);
     }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 277efd2..51da3ed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -115,23 +115,10 @@ public class MeteredKeyValueStore<K, V>
 
     @SuppressWarnings("unchecked")
     void initStoreSerde(final ProcessorContext context) {
-        final Serde<K> usedKeySerde;
-        final Serde<V> usedValueSerde;
-        final Map<String, Object> conf = context.appConfigs();
-        if (keySerde == null) {
-            usedKeySerde = (Serde<K>) context.keySerde();
-        } else {
-            usedKeySerde = keySerde;
-            usedKeySerde.configure(conf, true);
-        }
-        if (valueSerde == null) {
-            usedValueSerde = (Serde<V>) context.valueSerde();
-        } else {
-            usedValueSerde = valueSerde;
-            usedValueSerde.configure(conf, false);
-        }
         serdes = new StateSerdes<>(
-            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
     }
 
     @SuppressWarnings("unchecked")
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 1a55490..94b004e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -69,23 +69,10 @@ public class MeteredSessionStore<K, V>
     public void init(final ProcessorContext context,
                      final StateStore root) {
         //noinspection unchecked
-        final Serde<K> usedKeySerde;
-        final Serde<V> usedValueSerde;
-        final Map<String, Object> conf = context.appConfigs();
-        if (keySerde == null) {
-            usedKeySerde = (Serde<K>) context.keySerde();
-        } else {
-            usedKeySerde = keySerde;
-            usedKeySerde.configure(conf, true);
-        }
-        if (valueSerde == null) {
-            usedValueSerde = (Serde<V>) context.valueSerde();
-        } else {
-            usedValueSerde = valueSerde;
-            usedValueSerde.configure(conf, false);
-        }
         serdes = new StateSerdes<>(
-            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
         metrics = (StreamsMetricsImpl) context.metrics();
 
         taskName = context.taskId().toString();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index 9a239e1..2fa7c96 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -26,8 +26,6 @@ import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 
-import java.util.Map;
-
 /**
  * A Metered {@link TimestampedKeyValueStore} wrapper that is used for recording operation metrics, and hence its
  * inner KeyValueStore implementation do not need to provide its own metrics collecting functionality.
@@ -50,22 +48,9 @@ public class MeteredTimestampedKeyValueStore<K, V>
 
     @SuppressWarnings("unchecked")
     void initStoreSerde(final ProcessorContext context) {
-        final Serde<K> usedKeySerde;
-        final Serde<ValueAndTimestamp<V>> usedValueSerde;
-        final Map<String, Object> conf = context.appConfigs();
-        if (keySerde == null) {
-            usedKeySerde = (Serde<K>) context.keySerde();
-        } else {
-            usedKeySerde = keySerde;
-            usedKeySerde.configure(conf, true);
-        }
-        if (valueSerde == null) {
-            usedValueSerde = (Serde<ValueAndTimestamp<V>>) context.valueSerde();
-        } else {
-            usedValueSerde = valueSerde;
-            usedValueSerde.configure(conf, false);
-        }
         serdes = new StateSerdes<>(
-            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.keySerde()) : valueSerde);
     }
 }
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
index 7bc5486..1c10491 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
@@ -26,8 +26,6 @@ import org.apache.kafka.streams.state.TimestampedWindowStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowStore;
 
-import java.util.Map;
-
 /**
  * A Metered {@link MeteredTimestampedWindowStore} wrapper that is used for recording operation metrics, and hence its
  * inner WindowStore implementation do not need to provide its own metrics collecting functionality.
@@ -52,22 +50,9 @@ class MeteredTimestampedWindowStore<K, V>
     @SuppressWarnings("unchecked")
     @Override
     void initStoreSerde(final ProcessorContext context) {
-        final Serde<K> usedKeySerde;
-        final Serde<ValueAndTimestamp<V>> usedValueSerde;
-        final Map<String, Object> conf = context.appConfigs();
-        if (keySerde == null) {
-            usedKeySerde = (Serde<K>) context.keySerde();
-        } else {
-            usedKeySerde = keySerde;
-            usedKeySerde.configure(conf, true);
-        }
-        if (valueSerde == null) {
-            usedValueSerde = (Serde<ValueAndTimestamp<V>>) context.valueSerde();
-        } else {
-            usedValueSerde = valueSerde;
-            usedValueSerde.configure(conf, false);
-        }
         serdes = new StateSerdes<>(
-            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? new ValueAndTimestampSerde<>((Serde<V>) context.keySerde()) : valueSerde);
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 74de63e..6d2eaab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -99,23 +99,10 @@ public class MeteredWindowStore<K, V>
 
     @SuppressWarnings("unchecked")
     void initStoreSerde(final ProcessorContext context) {
-        final Serde<K> usedKeySerde;
-        final Serde<V> usedValueSerde;
-        final Map<String, Object> conf = context.appConfigs();
-        if (keySerde == null) {
-            usedKeySerde = (Serde<K>) context.keySerde();
-        } else {
-            usedKeySerde = keySerde;
-            usedKeySerde.configure(conf, true);
-        }
-        if (valueSerde == null) {
-            usedValueSerde = (Serde<V>) context.valueSerde();
-        } else {
-            usedValueSerde = valueSerde;
-            usedValueSerde.configure(conf, false);
-        }
         serdes = new StateSerdes<>(
-            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()), usedKeySerde, usedValueSerde);
+            ProcessorStateManager.storeChangelogTopic(context.applicationId(), name()),
+            keySerde == null ? (Serde<K>) context.keySerde() : keySerde,
+            valueSerde == null ? (Serde<V>) context.valueSerde() : valueSerde);
     }
 
     @SuppressWarnings("unchecked")
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 9ce7c84..3e55f29 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -25,10 +25,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.Selectable;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Utils;
@@ -36,9 +33,7 @@ import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
@@ -79,13 +74,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -738,131 +726,6 @@ public class KafkaStreamsTest {
     }
 
     @SuppressWarnings("unchecked")
-    @Test
-    public void shouldInitializeUserSerdes() {
-        final Deserializer mockSourceKeyDeserialzer = mock(Deserializer.class);
-        mockSourceKeyDeserialzer.configure(anyObject(), eq(true));
-        expectLastCall();
-        final Deserializer mockSourceValueDeserialzer = mock(Deserializer.class);
-        mockSourceValueDeserialzer.configure(anyObject(), eq(false));
-        expectLastCall();
-
-        final Serde mockSourceKeySerde = mock(Serde.class);
-        final Serde mockSourceValueSerde = mock(Serde.class);
-        expect(mockSourceKeySerde.deserializer()).andReturn(mockSourceKeyDeserialzer).anyTimes();
-        expect(mockSourceValueSerde.deserializer()).andReturn(mockSourceValueDeserialzer).anyTimes();
-
-
-
-        final Serializer mockThroughKeySerializer = mock(Serializer.class);
-        mockThroughKeySerializer.configure(anyObject(), eq(true));
-        expectLastCall();
-        final Serializer mockThroughValueSerializer = mock(Serializer.class);
-        mockThroughValueSerializer.configure(anyObject(), eq(false));
-        expectLastCall();
-        final Deserializer mockThroughKeyDeserializer = mock(Deserializer.class);
-        mockThroughKeyDeserializer.configure(anyObject(), eq(true));
-        expectLastCall();
-        final Deserializer mockThroughValueDeserializer = mock(Deserializer.class);
-        mockThroughValueDeserializer.configure(anyObject(), eq(false));
-        expectLastCall();
-
-        final Serde mockThroughKeySerde = mock(Serde.class);
-        final Serde mockThroughValueSerde = mock(Serde.class);
-        expect(mockThroughKeySerde.serializer()).andReturn(mockThroughKeySerializer).anyTimes();
-        expect(mockThroughValueSerde.serializer()).andReturn(mockThroughValueSerializer).anyTimes();
-        expect(mockThroughKeySerde.deserializer()).andReturn(mockThroughKeyDeserializer).anyTimes();
-        expect(mockThroughValueSerde.deserializer()).andReturn(mockThroughValueDeserializer).anyTimes();
-
-
-
-        final Serializer mockGroupedKeySerializer = mock(Serializer.class);
-        mockGroupedKeySerializer.configure(anyObject(), eq(true));
-        expectLastCall();
-        final Serializer mockGroupedValueSerializer = mock(Serializer.class);
-        mockGroupedValueSerializer.configure(anyObject(), eq(false));
-        expectLastCall();
-        final Deserializer mockGroupedKeyDeserializer = mock(Deserializer.class);
-        mockGroupedKeyDeserializer.configure(anyObject(), eq(true));
-        expectLastCall();
-        final Deserializer mockGroupedValueDeserializer = mock(Deserializer.class);
-        mockGroupedValueDeserializer.configure(anyObject(), eq(false));
-        expectLastCall();
-
-        final Serde mockGroupedKeySerde = mock(Serde.class);
-        final Serde mockGroupedValueSerde = mock(Serde.class);
-        expect(mockGroupedKeySerde.serializer()).andReturn(mockGroupedKeySerializer).anyTimes();
-        expect(mockGroupedValueSerde.serializer()).andReturn(mockGroupedValueSerializer).anyTimes();
-        expect(mockGroupedKeySerde.deserializer()).andReturn(mockGroupedKeyDeserializer).anyTimes();
-        expect(mockGroupedValueSerde.deserializer()).andReturn(mockGroupedValueDeserializer).anyTimes();
-
-
-
-        final Serializer mockOutputKeySerializer = mock(Serializer.class);
-        mockOutputKeySerializer.configure(anyObject(), eq(true));
-        expectLastCall();
-        final Serializer mockOutputValueSerializer = mock(Serializer.class);
-        mockOutputValueSerializer.configure(anyObject(), eq(false));
-        expectLastCall();
-
-        final Serde mockOutputKeySerde = mock(Serde.class);
-        final Serde mockOutputValueSerde = mock(Serde.class);
-        expect(mockOutputKeySerde.serializer()).andReturn(mockOutputKeySerializer).anyTimes();
-        expect(mockOutputValueSerde.serializer()).andReturn(mockOutputValueSerializer).anyTimes();
-
-
-
-        final Deserializer mockGlobalKeyDeserializer = mock(Deserializer.class);
-        mockGlobalKeyDeserializer.configure(anyObject(), eq(true));
-        expectLastCall();
-        final Deserializer mockGlobalValueDeserializer = mock(Deserializer.class);
-        mockGlobalValueDeserializer.configure(anyObject(), eq(false));
-        expectLastCall();
-
-        final Serde mockGlobalKeySerde = mock(Serde.class);
-        final Serde mockGlobalValueSerde = mock(Serde.class);
-        expect(mockGlobalKeySerde.deserializer()).andReturn(mockGlobalKeyDeserializer).anyTimes();
-        expect(mockGlobalValueSerde.deserializer()).andReturn(mockGlobalValueDeserializer).anyTimes();
-
-
-
-        builder
-            .stream("anyTopic", Consumed.with(mockSourceKeySerde, mockSourceValueSerde))
-            .through("anyOtherTopic", Produced.with(mockThroughKeySerde, mockThroughValueSerde))
-            .selectKey(KeyValue::pair)
-            .groupByKey(Grouped.with(mockGroupedKeySerde, mockGroupedValueSerde))
-            .count()
-            .toStream()
-            .to("anyOutput", Produced.with(mockOutputKeySerde, mockOutputValueSerde));
-        builder.globalTable("anyGlobal", Consumed.with(mockGlobalKeySerde, mockGlobalValueSerde));
-
-        replay(
-            mockSourceKeyDeserialzer, mockSourceValueDeserialzer, mockSourceKeySerde, mockSourceValueSerde,
-            mockThroughKeySerializer, mockThroughKeyDeserializer, mockThroughKeySerde,
-            mockThroughValueSerializer, mockThroughValueDeserializer, mockThroughValueSerde,
-            mockGroupedKeySerializer, mockGroupedKeyDeserializer, mockGroupedKeySerde,
-            mockGroupedValueSerializer, mockGroupedValueDeserializer, mockGroupedValueSerde,
-            mockOutputKeySerializer, mockOutputValueSerializer, mockOutputKeySerde, mockOutputValueSerde,
-            mockGlobalKeyDeserializer, mockGlobalValueDeserializer, mockGlobalKeySerde, mockGlobalValueSerde);
-
-        KafkaStreams kafkaStreams = null;
-        try {
-            kafkaStreams = new KafkaStreams(builder.build(), props);
-        } finally {
-            if (kafkaStreams != null) {
-                kafkaStreams.close();
-            }
-        }
-
-        verify(
-            mockSourceKeyDeserialzer, mockSourceValueDeserialzer,
-            mockThroughKeySerializer, mockThroughValueSerializer, mockThroughKeyDeserializer, mockThroughValueDeserializer,
-            mockGroupedKeySerializer, mockGroupedValueSerializer, mockGroupedKeyDeserializer, mockGroupedValueDeserializer,
-            mockOutputKeySerializer, mockOutputValueSerializer,
-            mockGlobalKeyDeserializer, mockGlobalValueDeserializer);
-    }
-
-    @SuppressWarnings("unchecked")
     private Topology getStatefulTopology(final String inputTopic,
                                          final String outputTopic,
                                          final String globalTopicName,
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index bab9d6b..5cbe95c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
@@ -40,7 +39,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -53,7 +51,6 @@ import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.mock;
 import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -95,7 +92,6 @@ public class MeteredKeyValueStoreTest {
         metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
         expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
         expect(context.taskId()).andReturn(taskId);
-        expect(context.appConfigs()).andReturn(new HashMap<>());
         expect(inner.name()).andReturn("metered").anyTimes();
     }
 
@@ -104,53 +100,6 @@ public class MeteredKeyValueStoreTest {
         metered.init(context, metered);
     }
 
-    @SuppressWarnings("unchecked")
-    @Test
-    public void shouldGetSerdesFromConfigWithoutUserSerdes() {
-        metered = new MeteredKeyValueStore<>(
-            inner,
-            "scope",
-            new MockTime(),
-            null,
-            null
-        );
-        final Serde mockSerde = mock(Serde.class);
-        replay(mockSerde);
-        expect(context.keySerde()).andReturn(mockSerde);
-        expect(context.valueSerde()).andReturn(mockSerde);
-
-        init();
-        verify(context, mockSerde);
-    }
-
-    @Test
-    public void shouldConfigureUserSerdes() {
-        final Serde<String> mockKeySerde = mock(Serde.class);
-        mockKeySerde.configure(anyObject(), eq(true));
-        expectLastCall();
-
-        final Serde<String> mockValueSerde = mock(Serde.class);
-        mockValueSerde.configure(anyObject(), eq(false));
-        expectLastCall();
-
-        replay(mockKeySerde, mockValueSerde);
-
-        metered = new MeteredKeyValueStore<>(
-            inner,
-            "scope",
-            new MockTime(),
-            mockKeySerde,
-            mockValueSerde
-        );
-
-        reset(context);
-        expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)).anyTimes();
-        expect(context.taskId()).andReturn(taskId).anyTimes();
-
-        init();
-        verify(context, mockKeySerde, mockValueSerde);
-    }
-
     @Test
     public void testMetrics() {
         init();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index 09c3371..30c382b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
@@ -43,7 +42,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -55,7 +53,6 @@ import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.mock;
 import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -94,7 +91,6 @@ public class MeteredSessionStoreTest {
         metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
         expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
         expect(context.taskId()).andReturn(taskId);
-        expect(context.appConfigs()).andReturn(new HashMap<>());
         expect(inner.name()).andReturn("metered").anyTimes();
     }
 
@@ -103,53 +99,6 @@ public class MeteredSessionStoreTest {
         metered.init(context, metered);
     }
 
-    @SuppressWarnings("unchecked")
-    @Test
-    public void shouldGetSerdesFromConfigWithoutUserSerdes() {
-        metered = new MeteredSessionStore<>(
-            inner,
-            "scope",
-            null,
-            null,
-            new MockTime()
-        );
-        final Serde mockSerde = mock(Serde.class);
-        replay(mockSerde);
-        expect(context.keySerde()).andReturn(mockSerde);
-        expect(context.valueSerde()).andReturn(mockSerde);
-
-        init();
-        verify(context, mockSerde);
-    }
-
-    @Test
-    public void shouldConfigureUserSerdes() {
-        final Serde<String> mockKeySerde = mock(Serde.class);
-        mockKeySerde.configure(anyObject(), eq(true));
-        expectLastCall();
-
-        final Serde<String> mockValueSerde = mock(Serde.class);
-        mockValueSerde.configure(anyObject(), eq(false));
-        expectLastCall();
-
-        replay(mockKeySerde, mockValueSerde);
-
-        metered = new MeteredSessionStore<>(
-            inner,
-            "scope",
-            mockKeySerde,
-            mockValueSerde,
-            new MockTime()
-        );
-
-        reset(context);
-        expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)).anyTimes();
-        expect(context.taskId()).andReturn(taskId).anyTimes();
-
-        init();
-        verify(context, mockKeySerde, mockValueSerde);
-    }
-
     @Test
     public void testMetrics() {
         init();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java
index 7cb9148..a3522f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampWindowStoreTest.java
@@ -19,14 +19,12 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.InternalMockProcessorContext;
 import org.apache.kafka.test.NoOpRecordCollector;
@@ -36,19 +34,13 @@ import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
 
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertNull;
 
 public class MeteredTimestampWindowStoreTest {
     private InternalMockProcessorContext context;
     @SuppressWarnings("unchecked")
     private final WindowStore<Bytes, byte[]> innerStoreMock = EasyMock.createNiceMock(WindowStore.class);
-    private MeteredTimestampedWindowStore<String, String> store = new MeteredTimestampedWindowStore<>(
+    private final MeteredTimestampedWindowStore<String, String> store = new MeteredTimestampedWindowStore<>(
         innerStoreMock,
         10L, // any size
         "scope",
@@ -78,73 +70,6 @@ public class MeteredTimestampWindowStoreTest {
     }
 
     @Test
-    public void shouldGetSerdesFromConfigWithoutUserSerdes() {
-        store = new MeteredTimestampedWindowStore<>(
-            innerStoreMock,
-            10L, // any size
-            "scope",
-            new MockTime(),
-            null,
-            null
-        );
-
-        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
-        final Serde mockSerde = mock(Serde.class);
-
-        new InternalMockProcessorContext(
-            TestUtils.tempDirectory(),
-            mockSerde,
-            mockSerde,
-            streamsMetrics,
-            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
-            NoOpRecordCollector::new,
-            new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
-        );
-        replay(mockSerde, innerStoreMock);
-
-        store.init(context, store);
-
-        verify(mockSerde);
-    }
-
-    @Test
-    public void shouldConfigureUserSerdes() {
-        final Serde<String> mockKeySerde = mock(Serde.class);
-        mockKeySerde.configure(anyObject(), eq(true));
-        expectLastCall();
-
-        final Serde<ValueAndTimestamp<String>> mockValueSerde = mock(Serde.class);
-        mockValueSerde.configure(anyObject(), eq(false));
-        expectLastCall();
-
-        store = new MeteredTimestampedWindowStore<>(
-            innerStoreMock,
-            10L, // any size
-            "scope",
-            new MockTime(),
-            mockKeySerde,
-            mockValueSerde
-        );
-
-        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
-        final Serde mockSerde = mock(Serde.class);
-
-        new InternalMockProcessorContext(
-            TestUtils.tempDirectory(),
-            mockSerde,
-            mockSerde,
-            streamsMetrics,
-            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
-            NoOpRecordCollector::new,
-            new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
-        );
-        replay(mockKeySerde, mockValueSerde, innerStoreMock);
-
-        store.init(context, store);
-        verify(mockKeySerde, mockValueSerde);
-    }
-
-    @Test
     public void shouldCloseUnderlyingStore() {
         innerStoreMock.close();
         EasyMock.expectLastCall();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 858b399..0f60d24 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.MockTime;
@@ -41,7 +40,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -54,7 +52,6 @@ import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.mock;
 import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.reset;
 import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -78,6 +75,7 @@ public class MeteredTimestampedKeyValueStoreTest {
     private MeteredTimestampedKeyValueStore<String, String> metered;
     private final String key = "key";
     private final Bytes keyBytes = Bytes.wrap(key.getBytes());
+    private final String value = "value";
     private final ValueAndTimestamp<String> valueAndTimestamp = ValueAndTimestamp.make("value", 97L);
     // timestamp is 97 what is ASCII of 'a'
     private final byte[] valueAndTimestampBytes = "\0\0\0\0\0\0\0avalue".getBytes();
@@ -96,7 +94,6 @@ public class MeteredTimestampedKeyValueStoreTest {
         metrics.config().recordLevel(Sensor.RecordingLevel.DEBUG);
         expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics));
         expect(context.taskId()).andReturn(taskId);
-        expect(context.appConfigs()).andReturn(new HashMap<>());
         expect(inner.name()).andReturn("metered").anyTimes();
     }
 
@@ -105,53 +102,6 @@ public class MeteredTimestampedKeyValueStoreTest {
         metered.init(context, metered);
     }
 
-    @SuppressWarnings("unchecked")
-    @Test
-    public void shouldGetSerdesFromConfigWithoutUserSerdes() {
-        metered = new MeteredTimestampedKeyValueStore<>(
-            inner,
-            "scope",
-            new MockTime(),
-            null,
-            null
-        );
-        final Serde mockSerde = mock(Serde.class);
-        replay(mockSerde);
-        expect(context.keySerde()).andReturn(mockSerde);
-        expect(context.valueSerde()).andReturn(mockSerde);
-
-        init();
-        verify(context, mockSerde);
-    }
-
-    @Test
-    public void shouldConfigureUserSerdes() {
-        final Serde<String> mockKeySerde = mock(Serde.class);
-        mockKeySerde.configure(anyObject(), eq(true));
-        expectLastCall();
-
-        final Serde<ValueAndTimestamp<String>> mockValueSerde = mock(Serde.class);
-        mockValueSerde.configure(anyObject(), eq(false));
-        expectLastCall();
-
-        replay(mockKeySerde, mockValueSerde);
-
-        metered = new MeteredTimestampedKeyValueStore<>(
-            inner,
-            "scope",
-            new MockTime(),
-            mockKeySerde,
-            mockValueSerde
-        );
-
-        reset(context);
-        expect(context.metrics()).andReturn(new MockStreamsMetrics(metrics)).anyTimes();
-        expect(context.taskId()).andReturn(taskId).anyTimes();
-
-        init();
-        verify(context, mockKeySerde, mockValueSerde);
-    }
-
     @Test
     public void testMetrics() {
         init();
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 749fd10..c0ed7f6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -22,12 +22,12 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.InternalMockProcessorContext;
@@ -59,7 +59,7 @@ public class MeteredWindowStoreTest {
     private InternalMockProcessorContext context;
     @SuppressWarnings("unchecked")
     private final WindowStore<Bytes, byte[]> innerStoreMock = createNiceMock(WindowStore.class);
-    private MeteredWindowStore<String, String> store = new MeteredWindowStore<>(
+    private final MeteredWindowStore<String, String> store = new MeteredWindowStore<>(
         innerStoreMock,
         10L, // any size
         "scope",
@@ -89,73 +89,6 @@ public class MeteredWindowStoreTest {
     }
 
     @Test
-    public void shouldGetSerdesFromConfigWithoutUserSerdes() {
-        store = new MeteredWindowStore<>(
-            innerStoreMock,
-            10L, // any size
-            "scope",
-            new MockTime(),
-            null,
-            null
-        );
-
-        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
-        final Serde mockSerde = mock(Serde.class);
-
-        new InternalMockProcessorContext(
-            TestUtils.tempDirectory(),
-            mockSerde,
-            mockSerde,
-            streamsMetrics,
-            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
-            NoOpRecordCollector::new,
-            new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
-        );
-        replay(mockSerde, innerStoreMock);
-
-        store.init(context, store);
-
-        verify(mockSerde);
-    }
-
-    @Test
-    public void shouldConfigureUserSerdes() {
-        final Serde<String> mockKeySerde = mock(Serde.class);
-        mockKeySerde.configure(anyObject(), eq(true));
-        expectLastCall();
-
-        final Serde<String> mockValueSerde = mock(Serde.class);
-        mockValueSerde.configure(anyObject(), eq(false));
-        expectLastCall();
-
-        store = new MeteredWindowStore<>(
-            innerStoreMock,
-            10L, // any size
-            "scope",
-            new MockTime(),
-            mockKeySerde,
-            mockValueSerde
-        );
-
-        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, "test");
-        final Serde mockSerde = mock(Serde.class);
-
-        new InternalMockProcessorContext(
-            TestUtils.tempDirectory(),
-            mockSerde,
-            mockSerde,
-            streamsMetrics,
-            new StreamsConfig(StreamsTestUtils.getStreamsConfig()),
-            NoOpRecordCollector::new,
-            new ThreadCache(new LogContext("testCache "), 0, streamsMetrics)
-        );
-        replay(mockKeySerde, mockValueSerde, innerStoreMock);
-
-        store.init(context, store);
-        verify(mockKeySerde, mockValueSerde);
-    }
-
-    @Test
     public void testMetrics() {
         replay(innerStoreMock);
         store.init(context, store);
@@ -195,7 +128,7 @@ public class MeteredWindowStoreTest {
 
     @Test
     public void shouldRecordFetchLatency() {
-        expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1)).andReturn(KeyValueIterators.emptyWindowStoreIterator());
+        expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 1, 1)).andReturn(KeyValueIterators.<byte[]>emptyWindowStoreIterator());
         replay(innerStoreMock);
 
         store.init(context, store);
@@ -208,7 +141,7 @@ public class MeteredWindowStoreTest {
 
     @Test
     public void shouldRecordFetchRangeLatency() {
-        expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)).andReturn(KeyValueIterators.emptyIterator());
+        expect(innerStoreMock.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1, 1)).andReturn(KeyValueIterators.<Windowed<Bytes>, byte[]>emptyIterator());
         replay(innerStoreMock);
 
         store.init(context, store);