You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/04/09 18:50:41 UTC

[kafka] branch trunk updated: KAFKA-12449: Remove deprecated WindowStore#put (#10293)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new db0323e  KAFKA-12449: Remove deprecated WindowStore#put (#10293)
db0323e is described below

commit db0323e9ba3f767614415d833a2081a33825a284
Author: Jorge Esteban Quilcate Otoya <qu...@gmail.com>
AuthorDate: Fri Apr 9 19:49:37 2021 +0100

    KAFKA-12449: Remove deprecated WindowStore#put (#10293)
    
    Removes `WindowStore#put(K,V)` that was deprecated via KIP-474.
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>
---
 docs/streams/upgrade-guide.html                    |  1 +
 .../internals/AbstractReadOnlyDecorator.java       |  7 --
 .../internals/AbstractReadWriteDecorator.java      |  7 --
 .../apache/kafka/streams/state/WindowStore.java    | 18 -----
 .../state/internals/CachingWindowStore.java        |  6 --
 .../internals/ChangeLoggingWindowBytesStore.java   | 10 ---
 .../state/internals/InMemoryWindowStore.java       |  6 --
 .../state/internals/MeteredWindowStore.java        |  7 --
 .../state/internals/RocksDBWindowStore.java        | 24 ------
 .../internals/TimestampedWindowStoreBuilder.java   |  7 --
 .../WindowToTimestampedWindowByteStoreAdapter.java |  7 --
 .../internals/ProcessorContextImplTest.java        | 10 +--
 .../internals/AbstractWindowBytesStoreTest.java    | 93 +++++++---------------
 .../CachingPersistentWindowStoreTest.java          | 88 ++++++++++----------
 ...angeLoggingTimestampedWindowBytesStoreTest.java |  6 +-
 .../ChangeLoggingWindowBytesStoreTest.java         |  8 +-
 .../state/internals/InMemoryWindowStoreTest.java   | 19 ++---
 .../MeteredTimestampedWindowStoreTest.java         |  6 +-
 .../state/internals/MeteredWindowStoreTest.java    |  3 +-
 .../state/internals/RocksDBWindowStoreTest.java    | 84 +++++++------------
 .../apache/kafka/streams/TopologyTestDriver.java   |  7 --
 .../kafka/streams/WindowStoreFacadeTest.java       | 11 ---
 22 files changed, 119 insertions(+), 316 deletions(-)

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 073badd..b082035 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -111,6 +111,7 @@
         <li> Overloaded <code>KStream#join, leftJoin, outerJoin</code> with <code>KStream</code> and <code>Joined</code> parameters: deprecated in Kafka 2.4.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join">KIP-479</a>).</li>
         <li> Overloaded <code>KafkaStreams#metadataForKey</code>: deprecated in Kafka 2.5.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance">KIP-535</a>).</li>
         <li> Overloaded <code>KafkaStreams#store</code>: deprecated in Kafka 2.5.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance">KIP-562</a>).</li>
+        <li> <code>WindowStore#put(K key, V value)</code>: deprecated in Kafka 2.4.0 (<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115526545">KIP-474</a>).</li>
     </ul>
     <p>
         The following dependencies were removed from Kafka Streams:
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
index 4424cdb..6c0c975 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
@@ -160,13 +160,6 @@ abstract class AbstractReadOnlyDecorator<T extends StateStore, K, V> extends Wra
             super(inner);
         }
 
-        @Deprecated
-        @Override
-        public void put(final K key,
-                        final V value) {
-            throw new UnsupportedOperationException(ERROR_MESSAGE);
-        }
-
         @Override
         public void put(final K key,
                         final V value,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
index d077089..86dd33f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
@@ -154,13 +154,6 @@ abstract class AbstractReadWriteDecorator<T extends StateStore, K, V> extends Wr
             super(inner);
         }
 
-        @Deprecated
-        @Override
-        public void put(final K key,
-                        final V value) {
-            wrapped().put(key, value);
-        }
-
         @Override
         public void put(final K key,
                         final V value,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index ef1f799..3ac11b8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -36,24 +36,6 @@ import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFail
 public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V> {
 
     /**
-     * Use the current record timestamp as the {@code windowStartTimestamp} and
-     * delegate to {@link WindowStore#put(Object, Object, long)}.
-     * <p>
-     * It's highly recommended to use {@link WindowStore#put(Object, Object, long)} instead, as the record timestamp
-     * is unlikely to be the correct windowStartTimestamp in general.
-     *
-     * @param key   The key to associate the value to
-     * @param value The value to update, it can be null;
-     *              if the serialized bytes are also null it is interpreted as delete
-     * @throws NullPointerException if the given key is {@code null}
-     * @deprecated as timestamp is not provided for the key-value pair, this causes inconsistency
-     * to identify the window frame to which the key belongs.
-     * Use {@link #put(Object, Object, long)} instead.
-     */
-    @Deprecated
-    void put(K key, V value);
-
-    /**
      * Put a key-value pair into the window with given window start timestamp
      * <p>
      * If serialized value bytes are null it is interpreted as delete. Note that deletes will be
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index c750f3a..cc7e5c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -142,12 +142,6 @@ class CachingWindowStore
         return true;
     }
 
-    @Deprecated
-    @Override
-    public synchronized void put(final Bytes key,
-                                 final byte[] value) {
-        put(key, value, context.timestamp());
-    }
 
     @Override
     public synchronized void put(final Bytes key,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index 6a367b1..70c8949 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -131,16 +131,6 @@ class ChangeLoggingWindowBytesStore
         return wrapped().backwardFetchAll(timeFrom, timeTo);
     }
 
-    @Deprecated
-    @Override
-    public void put(final Bytes key, final byte[] value) {
-        // Note: It's incorrect to bypass the wrapped store here by delegating to another method,
-        // but we have no alternative. We must send a timestamped key to the changelog, which means
-        // we need to know what timestamp gets used for the record. Hopefully, we can deprecate this
-        // method in the future to resolve the situation.
-        put(key, value, context.timestamp());
-    }
-
     @Override
     public void put(final Bytes key,
                     final byte[] value,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 0a1dc86..5dbfc0d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -108,12 +108,6 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
         open = true;
     }
 
-    @Deprecated
-    @Override
-    public void put(final Bytes key, final byte[] value) {
-        put(key, value, context.timestamp());
-    }
-
     @Override
     public void put(final Bytes key, final byte[] value, final long windowStartTimestamp) {
         removeExpiredSegments();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 1b419ec..8c77cf2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -157,13 +157,6 @@ public class MeteredWindowStore<K, V>
         return false;
     }
 
-    @Deprecated
-    @Override
-    public void put(final K key,
-                    final V value) {
-        put(key, value, context != null ? context.timestamp() : 0L);
-    }
-
     @Override
     public void put(final K key,
                     final V value,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index b3ba652..3949505 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -18,10 +18,6 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreContext;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -33,7 +29,6 @@ public class RocksDBWindowStore
     private final boolean retainDuplicates;
     private final long windowSize;
 
-    private InternalProcessorContext context;
     private int seqnum = 0;
 
     RocksDBWindowStore(final SegmentedBytesStore bytesStore,
@@ -44,25 +39,6 @@ public class RocksDBWindowStore
         this.windowSize = windowSize;
     }
 
-    @Deprecated
-    @Override
-    public void init(final ProcessorContext context, final StateStore root) {
-        this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
-        super.init(context, root);
-    }
-
-    @Override
-    public void init(final StateStoreContext context, final StateStore root) {
-        this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
-        super.init(context, root);
-    }
-
-    @Deprecated
-    @Override
-    public void put(final Bytes key, final byte[] value) {
-        put(key, value, context != null ? context.timestamp() : 0L);
-    }
-
     @Override
     public void put(final Bytes key, final byte[] value, final long windowStartTimestamp) {
         // Skip if value is null and duplicates are allowed since this delete is a no-op
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
index 075545f..417b45b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
@@ -122,13 +122,6 @@ public class TimestampedWindowStoreBuilder<K, V>
             wrapped.init(context, root);
         }
 
-        @Deprecated
-        @Override
-        public void put(final Bytes key,
-                        final byte[] value) {
-            wrapped.put(key, value);
-        }
-
         @Override
         public void put(final Bytes key,
                         final byte[] value,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
index 586e87d..8d895fc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
@@ -40,13 +40,6 @@ class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes, by
         this.store = store;
     }
 
-    @Deprecated
-    @Override
-    public void put(final Bytes key,
-                    final byte[] valueWithTimestamp) {
-        store.put(key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp));
-    }
-
     @Override
     public void put(final Bytes key,
                     final byte[] valueWithTimestamp,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index 6ab26e1..5694c52 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -227,7 +227,6 @@ public class ProcessorContextImplTest {
 
             checkThrowsUnsupportedOperation(store::flush, "flush()");
             checkThrowsUnsupportedOperation(() -> store.put("1", 1L, 1L), "put()");
-            checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put()");
 
             assertEquals(iters.get(0), store.fetchAll(0L, 0L));
             assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L));
@@ -246,7 +245,6 @@ public class ProcessorContextImplTest {
 
             checkThrowsUnsupportedOperation(store::flush, "flush()");
             checkThrowsUnsupportedOperation(() -> store.put("1", ValueAndTimestamp.make(1L, 1L), 1L), "put() [with timestamp]");
-            checkThrowsUnsupportedOperation(() -> store.put("1", ValueAndTimestamp.make(1L, 1L)), "put() [no timestamp]");
 
             assertEquals(timestampedIters.get(0), store.fetchAll(0L, 0L));
             assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L));
@@ -335,7 +333,7 @@ public class ProcessorContextImplTest {
             store.flush();
             assertTrue(flushExecuted);
 
-            store.put("1", 1L);
+            store.put("1", 1L, 1L);
             assertTrue(putExecuted);
 
             assertEquals(iters.get(0), store.fetchAll(0L, 0L));
@@ -355,7 +353,7 @@ public class ProcessorContextImplTest {
             store.flush();
             assertTrue(flushExecuted);
 
-            store.put("1", ValueAndTimestamp.make(1L, 1L));
+            store.put("1", ValueAndTimestamp.make(1L, 1L), 1L);
             assertTrue(putExecuted);
 
             store.put("1", ValueAndTimestamp.make(1L, 1L), 1L);
@@ -639,7 +637,7 @@ public class ProcessorContextImplTest {
         expect(windowStore.fetch(anyString(), anyLong())).andReturn(VALUE);
         expect(windowStore.all()).andReturn(iters.get(2));
 
-        windowStore.put(anyString(), anyLong());
+        windowStore.put(anyString(), anyLong(), anyLong());
         expectLastCall().andAnswer(() -> {
             putExecuted = true;
             return null;
@@ -662,7 +660,7 @@ public class ProcessorContextImplTest {
         expect(windowStore.fetch(anyString(), anyLong())).andReturn(VALUE_AND_TIMESTAMP);
         expect(windowStore.all()).andReturn(timestampedIters.get(2));
 
-        windowStore.put(anyString(), anyObject(ValueAndTimestamp.class));
+        windowStore.put(anyString(), anyObject(ValueAndTimestamp.class), anyLong());
         expectLastCall().andAnswer(() -> {
             putExecuted = true;
             return null;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
index aa4a412..8d73a2d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
@@ -782,24 +782,22 @@ public abstract class AbstractWindowBytesStoreTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
     public void testPutSameKeyTimestamp() {
         windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String());
         windowStore.init((StateStoreContext) context, windowStore);
 
         final long startTime = SEGMENT_INTERVAL - 4L;
 
-        setCurrentTime(startTime);
-        windowStore.put(0, "zero");
+        windowStore.put(0, "zero", startTime);
 
         assertEquals(
             new HashSet<>(Collections.singletonList("zero")),
             valuesToSet(windowStore.fetch(0, ofEpochMilli(startTime - WINDOW_SIZE),
                 ofEpochMilli(startTime + WINDOW_SIZE))));
 
-        windowStore.put(0, "zero");
-        windowStore.put(0, "zero+");
-        windowStore.put(0, "zero++");
+        windowStore.put(0, "zero", startTime);
+        windowStore.put(0, "zero+", startTime);
+        windowStore.put(0, "zero++", startTime);
 
         assertEquals(
             new HashSet<>(asList("zero", "zero", "zero+", "zero++")),
@@ -847,7 +845,6 @@ public abstract class AbstractWindowBytesStoreTest {
 
     @Test
     public void shouldCloseOpenIteratorsWhenStoreIsClosedAndNotThrowInvalidStateStoreExceptionOnHasNext() {
-        setCurrentTime(0);
         windowStore.put(1, "one", 1L);
         windowStore.put(1, "two", 2L);
         windowStore.put(1, "three", 3L);
@@ -905,18 +902,16 @@ public abstract class AbstractWindowBytesStoreTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
     public void testDeleteAndUpdate() {
 
         final long currentTime = 0;
-        setCurrentTime(currentTime);
-        windowStore.put(1, "one");
-        windowStore.put(1, "one v2");
+        windowStore.put(1, "one", currentTime);
+        windowStore.put(1, "one v2", currentTime);
 
         WindowStoreIterator<String> iterator = windowStore.fetch(1, 0, currentTime);
         assertEquals(new KeyValue<>(currentTime, "one v2"), iterator.next());
 
-        windowStore.put(1, null);
+        windowStore.put(1, null, currentTime);
         iterator = windowStore.fetch(1, 0, currentTime);
         assertFalse(iterator.hasNext());
     }
@@ -927,9 +922,8 @@ public abstract class AbstractWindowBytesStoreTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
     public void shouldThrowNullPointerExceptionOnPutNullKey() {
-        assertThrows(NullPointerException.class, () -> windowStore.put(null, "anyValue"));
+        assertThrows(NullPointerException.class, () -> windowStore.put(null, "anyValue", 0L));
     }
 
     @Test
@@ -1120,11 +1114,9 @@ public abstract class AbstractWindowBytesStoreTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
     public void testWindowIteratorPeek() {
         final long currentTime = 0;
-        setCurrentTime(currentTime);
-        windowStore.put(1, "one");
+        windowStore.put(1, "one", currentTime);
 
         final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.fetchAll(0L, currentTime);
 
@@ -1151,25 +1143,20 @@ public abstract class AbstractWindowBytesStoreTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
     public void shouldNotThrowConcurrentModificationException() {
         long currentTime = 0;
-        setCurrentTime(currentTime);
-        windowStore.put(1, "one");
+        windowStore.put(1, "one", currentTime);
 
         currentTime += WINDOW_SIZE * 10;
-        setCurrentTime(currentTime);
-        windowStore.put(1, "two");
+        windowStore.put(1, "two", currentTime);
 
         final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.all();
 
         currentTime += WINDOW_SIZE * 10;
-        setCurrentTime(currentTime);
-        windowStore.put(1, "three");
+        windowStore.put(1, "three", currentTime);
 
         currentTime += WINDOW_SIZE * 10;
-        setCurrentTime(currentTime);
-        windowStore.put(2, "four");
+        windowStore.put(2, "four", currentTime);
 
         // Iterator should return all records in store and not throw exception b/c some were added after fetch
         assertEquals(windowedPair(1, "one", 0), iterator.next());
@@ -1180,25 +1167,21 @@ public abstract class AbstractWindowBytesStoreTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
     public void testFetchDuplicates() {
         windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String());
         windowStore.init((StateStoreContext) context, windowStore);
 
         long currentTime = 0;
-        setCurrentTime(currentTime);
-        windowStore.put(1, "one");
-        windowStore.put(1, "one-2");
+        windowStore.put(1, "one", currentTime);
+        windowStore.put(1, "one-2", currentTime);
 
         currentTime += WINDOW_SIZE * 10;
-        setCurrentTime(currentTime);
-        windowStore.put(1, "two");
-        windowStore.put(1, "two-2");
+        windowStore.put(1, "two", currentTime);
+        windowStore.put(1, "two-2", currentTime);
 
         currentTime += WINDOW_SIZE * 10;
-        setCurrentTime(currentTime);
-        windowStore.put(1, "three");
-        windowStore.put(1, "three-2");
+        windowStore.put(1, "three", currentTime);
+        windowStore.put(1, "three-2", currentTime);
 
         final WindowStoreIterator<String> iterator = windowStore.fetch(1, 0, WINDOW_SIZE * 10);
 
@@ -1210,38 +1193,26 @@ public abstract class AbstractWindowBytesStoreTest {
     }
 
 
-    @SuppressWarnings("deprecation")
     private void putFirstBatch(final WindowStore<Integer, String> store,
                                @SuppressWarnings("SameParameterValue") final long startTime,
                                final InternalMockProcessorContext context) {
         context.setRecordContext(createRecordContext(startTime));
-        store.put(0, "zero");
-        context.setRecordContext(createRecordContext(startTime + 1L));
-        store.put(1, "one");
-        context.setRecordContext(createRecordContext(startTime + 2L));
-        store.put(2, "two");
-        context.setRecordContext(createRecordContext(startTime + 4L));
-        store.put(4, "four");
-        context.setRecordContext(createRecordContext(startTime + 5L));
-        store.put(5, "five");
+        store.put(0, "zero", startTime);
+        store.put(1, "one", startTime + 1L);
+        store.put(2, "two", startTime + 2L);
+        store.put(4, "four", startTime + 4L);
+        store.put(5, "five", startTime + 5L);
     }
 
-    @SuppressWarnings("deprecation")
     private void putSecondBatch(final WindowStore<Integer, String> store,
                                 @SuppressWarnings("SameParameterValue") final long startTime,
                                 final InternalMockProcessorContext context) {
-        context.setRecordContext(createRecordContext(startTime + 3L));
-        store.put(2, "two+1");
-        context.setRecordContext(createRecordContext(startTime + 4L));
-        store.put(2, "two+2");
-        context.setRecordContext(createRecordContext(startTime + 5L));
-        store.put(2, "two+3");
-        context.setRecordContext(createRecordContext(startTime + 6L));
-        store.put(2, "two+4");
-        context.setRecordContext(createRecordContext(startTime + 7L));
-        store.put(2, "two+5");
-        context.setRecordContext(createRecordContext(startTime + 8L));
-        store.put(2, "two+6");
+        store.put(2, "two+1", startTime + 3L);
+        store.put(2, "two+2", startTime + 4L);
+        store.put(2, "two+3", startTime + 5L);
+        store.put(2, "two+4", startTime + 6L);
+        store.put(2, "two+5", startTime + 7L);
+        store.put(2, "two+6", startTime + 8L);
     }
 
     long extractStoreTimestamp(final byte[] binaryKey) {
@@ -1278,10 +1249,6 @@ public abstract class AbstractWindowBytesStoreTest {
         return KeyValue.pair(new Windowed<>(key, WindowKeySchema.timeWindowForSize(timestamp, windowSize)), value);
     }
 
-    protected void setCurrentTime(final long currentTime) {
-        context.setRecordContext(createRecordContext(currentTime));
-    }
-
     private ProcessorRecordContext createRecordContext(final long time) {
         return new ProcessorRecordContext(time, 0, 0, "topic", null);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
index 66e49c8..735101f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
@@ -236,8 +236,8 @@ public class CachingPersistentWindowStoreTest {
     @Test
     @SuppressWarnings("deprecation")
     public void shouldPutFetchFromCache() {
-        cachingStore.put(bytesKey("a"), bytesValue("a"));
-        cachingStore.put(bytesKey("b"), bytesValue("b"));
+        cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
 
         assertThat(cachingStore.fetch(bytesKey("a"), 10), equalTo(bytesValue("a")));
         assertThat(cachingStore.fetch(bytesKey("b"), 10), equalTo(bytesValue("b")));
@@ -275,8 +275,8 @@ public class CachingPersistentWindowStoreTest {
     @Test
     @SuppressWarnings("deprecation")
     public void shouldPutFetchRangeFromCache() {
-        cachingStore.put(bytesKey("a"), bytesValue("a"));
-        cachingStore.put(bytesKey("b"), bytesValue("b"));
+        cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
 
         final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
             cachingStore.fetch(bytesKey("a"), bytesKey("b"), ofEpochMilli(10), ofEpochMilli(10));
@@ -293,16 +293,15 @@ public class CachingPersistentWindowStoreTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
     public void shouldGetAllFromCache() {
-        cachingStore.put(bytesKey("a"), bytesValue("a"));
-        cachingStore.put(bytesKey("b"), bytesValue("b"));
-        cachingStore.put(bytesKey("c"), bytesValue("c"));
-        cachingStore.put(bytesKey("d"), bytesValue("d"));
-        cachingStore.put(bytesKey("e"), bytesValue("e"));
-        cachingStore.put(bytesKey("f"), bytesValue("f"));
-        cachingStore.put(bytesKey("g"), bytesValue("g"));
-        cachingStore.put(bytesKey("h"), bytesValue("h"));
+        cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("f"), bytesValue("f"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("g"), bytesValue("g"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("h"), bytesValue("h"), DEFAULT_TIMESTAMP);
 
         final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.all();
         final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
@@ -318,14 +317,14 @@ public class CachingPersistentWindowStoreTest {
     @Test
     @SuppressWarnings("deprecation")
     public void shouldGetAllBackwardFromCache() {
-        cachingStore.put(bytesKey("a"), bytesValue("a"));
-        cachingStore.put(bytesKey("b"), bytesValue("b"));
-        cachingStore.put(bytesKey("c"), bytesValue("c"));
-        cachingStore.put(bytesKey("d"), bytesValue("d"));
-        cachingStore.put(bytesKey("e"), bytesValue("e"));
-        cachingStore.put(bytesKey("f"), bytesValue("f"));
-        cachingStore.put(bytesKey("g"), bytesValue("g"));
-        cachingStore.put(bytesKey("h"), bytesValue("h"));
+        cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("f"), bytesValue("f"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("g"), bytesValue("g"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("h"), bytesValue("h"), DEFAULT_TIMESTAMP);
 
         final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.backwardAll();
         final String[] array = {"h", "g", "f", "e", "d", "c", "b", "a"};
@@ -343,8 +342,7 @@ public class CachingPersistentWindowStoreTest {
     public void shouldFetchAllWithinTimestampRange() {
         final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
         for (int i = 0; i < array.length; i++) {
-            context.setTime(i);
-            cachingStore.put(bytesKey(array[i]), bytesValue(array[i]));
+            cachingStore.put(bytesKey(array[i]), bytesValue(array[i]), i);
         }
 
         final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
@@ -386,8 +384,7 @@ public class CachingPersistentWindowStoreTest {
     public void shouldFetchAllBackwardWithinTimestampRange() {
         final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
         for (int i = 0; i < array.length; i++) {
-            context.setTime(i);
-            cachingStore.put(bytesKey(array[i]), bytesValue(array[i]));
+            cachingStore.put(bytesKey(array[i]), bytesValue(array[i]), i);
         }
 
         final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
@@ -444,7 +441,7 @@ public class CachingPersistentWindowStoreTest {
     public void shouldForwardDirtyItemsWhenFlushCalled() {
         final Windowed<String> windowedKey =
             new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
-        cachingStore.put(bytesKey("1"), bytesValue("a"));
+        cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
         cachingStore.flush();
         assertEquals("a", cacheListener.forwarded.get(windowedKey).newValue);
         assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
@@ -462,24 +459,24 @@ public class CachingPersistentWindowStoreTest {
         cachingStore.setFlushListener(cacheListener, true);
         final Windowed<String> windowedKey =
             new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
-        cachingStore.put(bytesKey("1"), bytesValue("a"));
-        cachingStore.put(bytesKey("1"), bytesValue("b"));
+        cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
         cachingStore.flush();
         assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue);
         assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
         cacheListener.forwarded.clear();
-        cachingStore.put(bytesKey("1"), bytesValue("c"));
+        cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP);
         cachingStore.flush();
         assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue);
         assertEquals("b", cacheListener.forwarded.get(windowedKey).oldValue);
-        cachingStore.put(bytesKey("1"), null);
+        cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP);
         cachingStore.flush();
         assertNull(cacheListener.forwarded.get(windowedKey).newValue);
         assertEquals("c", cacheListener.forwarded.get(windowedKey).oldValue);
         cacheListener.forwarded.clear();
-        cachingStore.put(bytesKey("1"), bytesValue("a"));
-        cachingStore.put(bytesKey("1"), bytesValue("b"));
-        cachingStore.put(bytesKey("1"), null);
+        cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP);
         cachingStore.flush();
         assertNull(cacheListener.forwarded.get(windowedKey));
         cacheListener.forwarded.clear();
@@ -490,23 +487,23 @@ public class CachingPersistentWindowStoreTest {
     public void shouldForwardOldValuesWhenDisabled() {
         final Windowed<String> windowedKey =
             new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
-        cachingStore.put(bytesKey("1"), bytesValue("a"));
-        cachingStore.put(bytesKey("1"), bytesValue("b"));
+        cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
         cachingStore.flush();
         assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue);
         assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
-        cachingStore.put(bytesKey("1"), bytesValue("c"));
+        cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP);
         cachingStore.flush();
         assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue);
         assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
-        cachingStore.put(bytesKey("1"), null);
+        cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP);
         cachingStore.flush();
         assertNull(cacheListener.forwarded.get(windowedKey).newValue);
         assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
         cacheListener.forwarded.clear();
-        cachingStore.put(bytesKey("1"), bytesValue("a"));
-        cachingStore.put(bytesKey("1"), bytesValue("b"));
-        cachingStore.put(bytesKey("1"), null);
+        cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
+        cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP);
         cachingStore.flush();
         assertNull(cacheListener.forwarded.get(windowedKey));
         cacheListener.forwarded.clear();
@@ -619,7 +616,7 @@ public class CachingPersistentWindowStoreTest {
     @Test
     @SuppressWarnings("deprecation")
     public void shouldClearNamespaceCacheOnClose() {
-        cachingStore.put(bytesKey("a"), bytesValue("a"));
+        cachingStore.put(bytesKey("a"), bytesValue("a"), 0L);
         assertEquals(1, cache.size());
         cachingStore.close();
         assertEquals(0, cache.size());
@@ -641,7 +638,7 @@ public class CachingPersistentWindowStoreTest {
     @SuppressWarnings("deprecation")
     public void shouldThrowIfTryingToWriteToClosedCachingStore() {
         cachingStore.close();
-        assertThrows(InvalidStateStoreException.class, () -> cachingStore.put(bytesKey("a"), bytesValue("a")));
+        assertThrows(InvalidStateStoreException.class, () -> cachingStore.put(bytesKey("a"), bytesValue("a"), 0L));
     }
 
     @Test
@@ -789,13 +786,13 @@ public class CachingPersistentWindowStoreTest {
     @Test
     @SuppressWarnings("deprecation")
     public void shouldThrowNullPointerExceptionOnPutNullKey() {
-        assertThrows(NullPointerException.class, () -> cachingStore.put(null, bytesValue("anyValue")));
+        assertThrows(NullPointerException.class, () -> cachingStore.put(null, bytesValue("anyValue"), 0L));
     }
 
     @Test
     @SuppressWarnings("deprecation")
     public void shouldNotThrowNullPointerExceptionOnPutNullValue() {
-        cachingStore.put(bytesKey("a"), null);
+        cachingStore.put(bytesKey("a"), null, 0L);
     }
 
     @Test
@@ -919,13 +916,12 @@ public class CachingPersistentWindowStoreTest {
             bytesValue(value));
     }
 
-    @SuppressWarnings("deprecation")
     private int addItemsToCache() {
         int cachedSize = 0;
         int i = 0;
         while (cachedSize < MAX_CACHE_SIZE_BYTES) {
             final String kv = String.valueOf(i++);
-            cachingStore.put(bytesKey(kv), bytesValue(kv));
+            cachingStore.put(bytesKey(kv), bytesValue(kv), DEFAULT_TIMESTAMP);
             cachedSize += memoryCacheEntrySize(kv.getBytes(), kv.getBytes(), TOPIC) +
                 8 + // timestamp
                 4; // sequenceNumber
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
index 6608739..50c18fe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
@@ -99,7 +99,7 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
         context.logChange(store.name(), key, value, 42);
 
         EasyMock.replay(context);
-        store.put(bytesKey, valueAndTimestamp);
+        store.put(bytesKey, valueAndTimestamp, context.timestamp());
 
         EasyMock.verify(inner, context);
     }
@@ -146,8 +146,8 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
 
         EasyMock.replay(context);
 
-        store.put(bytesKey, valueAndTimestamp);
-        store.put(bytesKey, valueAndTimestamp);
+        store.put(bytesKey, valueAndTimestamp, context.timestamp());
+        store.put(bytesKey, valueAndTimestamp, context.timestamp());
 
         EasyMock.verify(inner, context);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index b19b99a..36e3297 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -84,7 +84,6 @@ public class ChangeLoggingWindowBytesStoreTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
     public void shouldLogPuts() {
         inner.put(bytesKey, value, 0);
         EasyMock.expectLastCall();
@@ -98,7 +97,7 @@ public class ChangeLoggingWindowBytesStoreTest {
         context.logChange(store.name(), key, value, 0L);
 
         EasyMock.replay(context);
-        store.put(bytesKey, value);
+        store.put(bytesKey, value, context.timestamp());
 
         EasyMock.verify(inner, context);
     }
@@ -152,7 +151,6 @@ public class ChangeLoggingWindowBytesStoreTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
     public void shouldRetainDuplicatesWhenSet() {
         store = new ChangeLoggingWindowBytesStore(inner, true, WindowKeySchema::toStoreKeyBinary);
 
@@ -171,8 +169,8 @@ public class ChangeLoggingWindowBytesStoreTest {
 
         EasyMock.replay(context);
 
-        store.put(bytesKey, value);
-        store.put(bytesKey, value);
+        store.put(bytesKey, value, context.timestamp());
+        store.put(bytesKey, value, context.timestamp());
 
         EasyMock.verify(inner, context);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
index e5f2142..2ef9bad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
@@ -126,37 +126,30 @@ public class InMemoryWindowStoreTest extends AbstractWindowBytesStoreTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
     public void testExpiration() {
 
         long currentTime = 0;
-        setCurrentTime(currentTime);
-        windowStore.put(1, "one");
+        windowStore.put(1, "one", currentTime);
 
         currentTime += RETENTION_PERIOD / 4;
-        setCurrentTime(currentTime);
-        windowStore.put(1, "two");
+        windowStore.put(1, "two", currentTime);
 
         currentTime += RETENTION_PERIOD / 4;
-        setCurrentTime(currentTime);
-        windowStore.put(1, "three");
+        windowStore.put(1, "three", currentTime);
 
         currentTime += RETENTION_PERIOD / 4;
-        setCurrentTime(currentTime);
-        windowStore.put(1, "four");
+        windowStore.put(1, "four", currentTime);
 
         // increase current time to the full RETENTION_PERIOD to expire first record
         currentTime = currentTime + RETENTION_PERIOD / 4;
-        setCurrentTime(currentTime);
-        windowStore.put(1, "five");
+        windowStore.put(1, "five", currentTime);
 
         KeyValueIterator<Windowed<Integer>, String> iterator = windowStore
             .fetchAll(0L, currentTime);
 
         // effect of this put (expires next oldest record, adds new one) should not be reflected in the already fetched results
         currentTime = currentTime + RETENTION_PERIOD / 4;
-        setCurrentTime(currentTime);
-        windowStore.put(1, "six");
+        windowStore.put(1, "six", currentTime);
 
         // should only have middle 4 values, as (only) the first record was expired at the time of the fetch
         // and the last was inserted after the fetch
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
index 595cb01..c05c1ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
@@ -202,7 +202,6 @@ public class MeteredTimestampedWindowStoreTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
     public void shouldNotThrowExceptionIfSerdesCorrectlySetFromProcessorContext() {
         EasyMock.expect(innerStoreMock.name()).andStubReturn("mocked-store");
         EasyMock.replay(innerStoreMock);
@@ -217,7 +216,7 @@ public class MeteredTimestampedWindowStoreTest {
         store.init((StateStoreContext) context, innerStoreMock);
 
         try {
-            store.put("key", ValueAndTimestamp.make(42L, 60000));
+            store.put("key", ValueAndTimestamp.make(42L, 60000), 60000L);
         } catch (final StreamsException exception) {
             if (exception.getCause() instanceof ClassCastException) {
                 fail("Serdes are not correctly set from processor context.");
@@ -227,7 +226,6 @@ public class MeteredTimestampedWindowStoreTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
     public void shouldNotThrowExceptionIfSerdesCorrectlySetFromConstructorParameters() {
         EasyMock.expect(innerStoreMock.name()).andStubReturn("mocked-store");
         EasyMock.replay(innerStoreMock);
@@ -242,7 +240,7 @@ public class MeteredTimestampedWindowStoreTest {
         store.init((StateStoreContext) context, innerStoreMock);
 
         try {
-            store.put("key", ValueAndTimestamp.make(42L, 60000));
+            store.put("key", ValueAndTimestamp.make(42L, 60000), 60000L);
         } catch (final StreamsException exception) {
             if (exception.getCause() instanceof ClassCastException) {
                 fail("Serdes are not correctly set from constructor parameters.");
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index a74aaea..cb3a268 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -290,7 +290,6 @@ public class MeteredWindowStoreTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
     public void shouldRecordPutLatency() {
         final byte[] bytes = "a".getBytes();
         innerStoreMock.put(eq(Bytes.wrap(bytes)), anyObject(), eq(context.timestamp()));
@@ -298,7 +297,7 @@ public class MeteredWindowStoreTest {
         replay(innerStoreMock);
 
         store.init((StateStoreContext) context, store);
-        store.put("a", "a");
+        store.put("a", "a", context.timestamp());
         final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
         if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
             assertEquals(1.0, getMetricByNameFilterByTags(
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 9e9b0b4..ce85893 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -81,26 +81,21 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
     public void shouldOnlyIterateOpenSegments() {
         long currentTime = 0;
-        setCurrentTime(currentTime);
-        windowStore.put(1, "one");
+        windowStore.put(1, "one", currentTime);
 
         currentTime = currentTime + SEGMENT_INTERVAL;
-        setCurrentTime(currentTime);
-        windowStore.put(1, "two");
+        windowStore.put(1, "two", currentTime);
         currentTime = currentTime + SEGMENT_INTERVAL;
 
-        setCurrentTime(currentTime);
-        windowStore.put(1, "three");
+        windowStore.put(1, "three", currentTime);
 
         final WindowStoreIterator<String> iterator = windowStore.fetch(1, 0L, currentTime);
 
         // roll to the next segment that will close the first
         currentTime = currentTime + SEGMENT_INTERVAL;
-        setCurrentTime(currentTime);
-        windowStore.put(1, "four");
+        windowStore.put(1, "four", currentTime);
 
         // should only have 2 values as the first segment is no longer open
         assertEquals(new KeyValue<>(SEGMENT_INTERVAL, "two"), iterator.next());
@@ -109,22 +104,18 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
     public void testRolling() {
 
         // to validate segments
         final long startTime = SEGMENT_INTERVAL * 2;
         final long increment = SEGMENT_INTERVAL / 2;
-        setCurrentTime(startTime);
-        windowStore.put(0, "zero");
+        windowStore.put(0, "zero", startTime);
         assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir));
 
-        setCurrentTime(startTime + increment);
-        windowStore.put(1, "one");
+        windowStore.put(1, "one", startTime + increment);
         assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir));
 
-        setCurrentTime(startTime + increment * 2);
-        windowStore.put(2, "two");
+        windowStore.put(2, "two", startTime + increment * 2);
         assertEquals(
             Utils.mkSet(
                 segments.segmentName(2),
@@ -133,8 +124,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
             segmentDirs(baseDir)
         );
 
-        setCurrentTime(startTime + increment * 4);
-        windowStore.put(4, "four");
+        windowStore.put(4, "four", startTime + increment * 4);
         assertEquals(
             Utils.mkSet(
                 segments.segmentName(2),
@@ -144,8 +134,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
             segmentDirs(baseDir)
         );
 
-        setCurrentTime(startTime + increment * 5);
-        windowStore.put(5, "five");
+        windowStore.put(5, "five", startTime + increment * 5);
         assertEquals(
             Utils.mkSet(
                 segments.segmentName(2),
@@ -192,8 +181,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
                 ofEpochMilli(startTime + increment * 5 - WINDOW_SIZE),
                 ofEpochMilli(startTime + increment * 5 + WINDOW_SIZE))));
 
-        setCurrentTime(startTime + increment * 6);
-        windowStore.put(6, "six");
+        windowStore.put(6, "six", startTime + increment * 6);
         assertEquals(
             Utils.mkSet(
                 segments.segmentName(3),
@@ -246,8 +234,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
                 ofEpochMilli(startTime + increment * 6 - WINDOW_SIZE),
                 ofEpochMilli(startTime + increment * 6 + WINDOW_SIZE))));
 
-        setCurrentTime(startTime + increment * 7);
-        windowStore.put(7, "seven");
+        windowStore.put(7, "seven", startTime + increment * 7);
         assertEquals(
             Utils.mkSet(
                 segments.segmentName(3),
@@ -306,8 +293,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
                 ofEpochMilli(startTime + increment * 7 - WINDOW_SIZE),
                 ofEpochMilli(startTime + increment * 7 + WINDOW_SIZE))));
 
-        setCurrentTime(startTime + increment * 8);
-        windowStore.put(8, "eight");
+        windowStore.put(8, "eight", startTime + increment * 8);
         assertEquals(
             Utils.mkSet(
                 segments.segmentName(4),
@@ -385,7 +371,6 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
     public void testSegmentMaintenance() {
 
         windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(),
@@ -393,23 +378,20 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
         windowStore.init((StateStoreContext) context, windowStore);
 
         context.setTime(0L);
-        setCurrentTime(0);
-        windowStore.put(0, "v");
+        windowStore.put(0, "v", 0);
         assertEquals(
             Utils.mkSet(segments.segmentName(0L)),
             segmentDirs(baseDir)
         );
 
-        setCurrentTime(SEGMENT_INTERVAL - 1);
-        windowStore.put(0, "v");
-        windowStore.put(0, "v");
+        windowStore.put(0, "v", SEGMENT_INTERVAL - 1);
+        windowStore.put(0, "v", SEGMENT_INTERVAL - 1);
         assertEquals(
             Utils.mkSet(segments.segmentName(0L)),
             segmentDirs(baseDir)
         );
 
-        setCurrentTime(SEGMENT_INTERVAL);
-        windowStore.put(0, "v");
+        windowStore.put(0, "v", SEGMENT_INTERVAL);
         assertEquals(
             Utils.mkSet(segments.segmentName(0L), segments.segmentName(1L)),
             segmentDirs(baseDir)
@@ -431,8 +413,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
             segmentDirs(baseDir)
         );
 
-        setCurrentTime(SEGMENT_INTERVAL * 3);
-        windowStore.put(0, "v");
+        windowStore.put(0, "v", SEGMENT_INTERVAL * 3);
 
         iter = windowStore.fetch(0, ofEpochMilli(0L), ofEpochMilli(SEGMENT_INTERVAL * 4));
         fetchedCount = 0;
@@ -447,8 +428,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
             segmentDirs(baseDir)
         );
 
-        setCurrentTime(SEGMENT_INTERVAL * 5);
-        windowStore.put(0, "v");
+        windowStore.put(0, "v", SEGMENT_INTERVAL * 5);
 
         iter = windowStore.fetch(0, ofEpochMilli(SEGMENT_INTERVAL * 4), ofEpochMilli(SEGMENT_INTERVAL * 10));
         fetchedCount = 0;
@@ -512,29 +492,19 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
     public void testRestore() throws Exception {
         final long startTime = SEGMENT_INTERVAL * 2;
         final long increment = SEGMENT_INTERVAL / 2;
 
-        setCurrentTime(startTime);
-        windowStore.put(0, "zero");
-        setCurrentTime(startTime + increment);
-        windowStore.put(1, "one");
-        setCurrentTime(startTime + increment * 2);
-        windowStore.put(2, "two");
-        setCurrentTime(startTime + increment * 3);
-        windowStore.put(3, "three");
-        setCurrentTime(startTime + increment * 4);
-        windowStore.put(4, "four");
-        setCurrentTime(startTime + increment * 5);
-        windowStore.put(5, "five");
-        setCurrentTime(startTime + increment * 6);
-        windowStore.put(6, "six");
-        setCurrentTime(startTime + increment * 7);
-        windowStore.put(7, "seven");
-        setCurrentTime(startTime + increment * 8);
-        windowStore.put(8, "eight");
+        windowStore.put(0, "zero", startTime);
+        windowStore.put(1, "one", startTime + increment);
+        windowStore.put(2, "two", startTime + increment * 2);
+        windowStore.put(3, "three", startTime + increment * 3);
+        windowStore.put(4, "four", startTime + increment * 4);
+        windowStore.put(5, "five", startTime + increment * 5);
+        windowStore.put(6, "six", startTime + increment * 6);
+        windowStore.put(7, "seven", startTime + increment * 7);
+        windowStore.put(8, "eight", startTime + increment * 8);
         windowStore.flush();
 
         windowStore.close();
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 659fb47..f657209 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -1360,13 +1360,6 @@ public class TopologyTestDriver implements Closeable {
             inner.init(context, root);
         }
 
-        @Deprecated
-        @Override
-        public void put(final K key,
-                        final V value) {
-            inner.put(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP));
-        }
-
         @Override
         public void put(final K key,
                         final V value,
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
index d184f98..434782c 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
@@ -71,17 +71,6 @@ public class WindowStoreFacadeTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
-    public void shouldPutWithUnknownTimestamp() {
-        mockedWindowTimestampStore.put("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP));
-        expectLastCall();
-        replay(mockedWindowTimestampStore);
-
-        windowStoreFacade.put("key", "value");
-        verify(mockedWindowTimestampStore);
-    }
-
-    @Test
     public void shouldPutWindowStartTimestampWithUnknownTimestamp() {
         mockedWindowTimestampStore.put("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP), 21L);
         expectLastCall();