You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/11/15 01:29:30 UTC

[kafka] branch trunk updated: MINOR: Remove deprecated callers (#5911)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 14d3ead  MINOR: Remove deprecated callers (#5911)
14d3ead is described below

commit 14d3ead19d250f2f3117af473ff6244c663ef8ca
Author: Guozhang Wang <wa...@gmail.com>
AuthorDate: Wed Nov 14 17:29:19 2018 -0800

    MINOR: Remove deprecated callers (#5911)
    
    Callers of 1) Windows#until, 2) Windows#of, 3) Serialized are replaced when possible with the new APIs.
    
    Reviewers: Matthias J. Sax <mj...@apache.org>, Bill Bejeck <bi...@confluent.io>
---
 .../examples/pageview/PageViewTypedDemo.java       |  4 +--
 .../examples/pageview/PageViewUntypedDemo.java     |  4 +--
 .../apache/kafka/streams/kstream/JoinWindows.java  |  4 +--
 .../org/apache/kafka/streams/kstream/KStream.java  |  2 +-
 .../org/apache/kafka/streams/kstream/KTable.java   |  2 +-
 .../kafka/streams/kstream/KeyValueMapper.java      |  4 +--
 .../kafka/streams/kstream/SessionWindows.java      |  2 +-
 .../KStreamAggregationDedupIntegrationTest.java    | 10 ++----
 .../KStreamAggregationIntegrationTest.java         | 16 ++++------
 .../kafka/streams/kstream/JoinWindowsTest.java     | 34 ++++++++------------
 .../kstream/RepartitionTopicNamingTest.java        | 29 +++++++++--------
 .../kafka/streams/kstream/SessionWindowsTest.java  | 17 ++++------
 .../kafka/streams/kstream/TimeWindowsTest.java     | 25 ++++++---------
 .../kstream/internals/KGroupedStreamImplTest.java  |  4 +--
 .../kstream/internals/KGroupedTableImplTest.java   | 13 +++-----
 .../streams/kstream/internals/KStreamImplTest.java | 37 +++++++++++-----------
 .../internals/KStreamWindowAggregateTest.java      |  2 +-
 .../kstream/internals/KStreamWindowReduceTest.java |  6 ++--
 .../kstream/internals/KTableAggregateTest.java     |  6 ++--
 .../streams/kstream/internals/KTableImplTest.java  |  6 ++--
 .../internals/KTableKTableLeftJoinTest.java        | 25 +++------------
 .../internals/KTableTransformValuesTest.java       |  7 ++--
 .../internals/SessionWindowedKStreamImplTest.java  |  4 +--
 .../kstream/internals/SuppressScenarioTest.java    |  7 ++--
 .../internals/TimeWindowedKStreamImplTest.java     |  4 +--
 .../apache/kafka/streams/perf/YahooBenchmark.java  |  4 +--
 .../streams/tests/BrokerCompatibilityTest.java     |  4 +--
 .../kafka/streams/tests/SmokeTestClient.java       |  7 ++--
 28 files changed, 125 insertions(+), 164 deletions(-)

diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 18b4912..f09ac80 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.TimeWindows;
@@ -170,7 +171,6 @@ public class PageViewTypedDemo {
         public String region;
     }
 
-    @SuppressWarnings("deprecation")
     public static void main(final String[] args) {
         final Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-typed");
@@ -206,7 +206,7 @@ public class PageViewTypedDemo {
                 return viewByRegion;
             })
             .map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion))
-            .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), new JSONSerde<>()))
+            .groupByKey(Grouped.with(Serdes.String(), new JSONSerde<>()))
             .windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))
             .count()
             .toStream()
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index d492238..2a9972b 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Produced;
@@ -54,7 +55,6 @@ import java.util.Properties;
  */
 public class PageViewUntypedDemo {
 
-    @SuppressWarnings("deprecation")
     public static void main(final String[] args) throws Exception {
         final Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-untyped");
@@ -87,7 +87,7 @@ public class PageViewUntypedDemo {
 
             })
             .map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(), viewRegion))
-            .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), jsonSerde))
+            .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
             .windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))
             .count()
             .toStream()
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 62eade4..2087009 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -246,7 +246,7 @@ public final class JoinWindows extends Windows<Window> {
      * @param durationMs the window retention time in milliseconds
      * @return itself
      * @throws IllegalArgumentException if {@code durationMs} is smaller than the window size
-     * @deprecated since 2.1. Use {@link JoinWindows#grace(Duration)} instead.
+     * @deprecated since 2.1. Use {@link Materialized#withRetention(Duration)} instead.
      */
     @SuppressWarnings("deprecation")
     @Override
@@ -264,7 +264,7 @@ public final class JoinWindows extends Windows<Window> {
      * For {@link TimeWindows} the maintain duration is at least as small as the window size.
      *
      * @return the window maintain duration
-     * @deprecated since 2.1. Use {@link JoinWindows#gracePeriodMs()} instead.
+     * @deprecated since 2.1. This function should not be used anymore as retention period can be specified via {@link Materialized#withRetention(Duration)}.
      */
     @SuppressWarnings("deprecation")
     @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 77987a9..6055199 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -874,7 +874,7 @@ public interface KStream<K, V> {
      * and rereading all records from it, such that the resulting {@link KGroupedStream} is partitioned on the new key.
      * <p>
      * This operation is equivalent to calling {@link #selectKey(KeyValueMapper)} followed by {@link #groupByKey()}.
-     * If the key type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Serialized)} instead.
+     * If the key type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Grouped)} instead.
      *
      * @param selector a {@link KeyValueMapper} that computes a new key for grouping
      * @param <KR>     the key type of the result {@link KGroupedStream}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index b49f3e4..5ed0270 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -573,7 +573,7 @@ public interface KTable<K, V> {
      * records to and rereading all updated records from it, such that the resulting {@link KGroupedTable} is partitioned
      * on the new key.
      * <p>
-     * If the key or value type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Serialized)}
+     * If the key or value type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Grouped)}
      * instead.
      *
      * @param selector a {@link KeyValueMapper} that computes a new grouping key and value to be aggregated
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
index 2a56a05..1112fbb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java
@@ -39,9 +39,9 @@ import org.apache.kafka.streams.KeyValue;
  * @see KStream#flatMap(KeyValueMapper)
  * @see KStream#selectKey(KeyValueMapper)
  * @see KStream#groupBy(KeyValueMapper)
- * @see KStream#groupBy(KeyValueMapper, Serialized)
+ * @see KStream#groupBy(KeyValueMapper, Grouped)
  * @see KTable#groupBy(KeyValueMapper)
- * @see KTable#groupBy(KeyValueMapper, Serialized)
+ * @see KTable#groupBy(KeyValueMapper, Grouped)
  * @see KTable#toStream(KeyValueMapper)
  */
 public interface KeyValueMapper<K, V, VR> {
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
index 02c7cbf..526c9d1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SessionWindows.java
@@ -89,7 +89,7 @@ public final class SessionWindows {
      * @return a new window specification with default maintain duration of 1 day
      *
      * @throws IllegalArgumentException if {@code inactivityGapMs} is zero or negative
-     * @deprecated User {@link #with(Duration)} instead.
+     * @deprecated Use {@link #with(Duration)} instead.
      */
     @Deprecated
     public static SessionWindows with(final long inactivityGapMs) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index 1db23a5..7493b06 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -78,7 +79,6 @@ public class KStreamAggregationDedupIntegrationTest {
     private Reducer<String> reducer;
     private KStream<Integer, String> stream;
 
-    @SuppressWarnings("deprecation")
     @Before
     public void before() throws InterruptedException {
         testNo++;
@@ -96,10 +96,7 @@ public class KStreamAggregationDedupIntegrationTest {
 
         final KeyValueMapper<Integer, String, String> mapper = MockMapper.selectValueMapper();
         stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
-        groupedStream = stream
-            .groupBy(
-                mapper,
-                org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()));
+        groupedStream = stream.groupBy(mapper, Grouped.with(Serdes.String(), Serdes.String()));
 
         reducer = (value1, value2) -> value1 + ":" + value2;
     }
@@ -173,14 +170,13 @@ public class KStreamAggregationDedupIntegrationTest {
         );
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldGroupByKey() throws Exception {
         final long timestamp = mockTime.milliseconds();
         produceMessages(timestamp);
         produceMessages(timestamp);
 
-        stream.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.Integer(), Serdes.String()))
+        stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String()))
             .windowedBy(TimeWindows.of(ofMillis(500L)))
             .count(Materialized.as("count-windows"))
             .toStream((windowedKey, value) -> windowedKey.key() + "@" + windowedKey.window().start())
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 718483b..04cc0e1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
@@ -92,7 +93,7 @@ import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-@SuppressWarnings({"unchecked", "deprecation"})
+@SuppressWarnings("unchecked")
 @Category({IntegrationTest.class})
 public class KStreamAggregationIntegrationTest {
     private static final int NUM_BROKERS = 1;
@@ -135,10 +136,7 @@ public class KStreamAggregationIntegrationTest {
 
         final KeyValueMapper<Integer, String, String> mapper = MockMapper.selectValueMapper();
         stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
-        groupedStream = stream
-            .groupBy(
-                mapper,
-                org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()));
+        groupedStream = stream.groupBy(mapper, Grouped.with(Serdes.String(), Serdes.String()));
 
         reducer = (value1, value2) -> value1 + ":" + value2;
         initializer = () -> 0;
@@ -428,7 +426,7 @@ public class KStreamAggregationIntegrationTest {
         produceMessages(timestamp);
         produceMessages(timestamp);
 
-        stream.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.Integer(), Serdes.String()))
+        stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String()))
                 .windowedBy(TimeWindows.of(ofMillis(500L)))
                 .count()
                 .toStream((windowedKey, value) -> windowedKey.key() + "@" + windowedKey.window().start()).to(outputTopic, Produced.with(Serdes.String(), Serdes.Long()));
@@ -521,7 +519,7 @@ public class KStreamAggregationIntegrationTest {
         final CountDownLatch latch = new CountDownLatch(11);
 
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
-                .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
                 .windowedBy(SessionWindows.with(ofMillis(sessionGap)))
                 .count()
                 .toStream()
@@ -619,7 +617,7 @@ public class KStreamAggregationIntegrationTest {
         final CountDownLatch latch = new CountDownLatch(11);
         final String userSessionsStore = "UserSessionsStore";
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
-                .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
                 .windowedBy(SessionWindows.with(ofMillis(sessionGap)))
                 .reduce((value1, value2) -> value1 + ":" + value2, Materialized.as(userSessionsStore))
                 .toStream()
@@ -706,7 +704,7 @@ public class KStreamAggregationIntegrationTest {
         final CountDownLatch latch = new CountDownLatch(5);
 
         builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
-               .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()))
+               .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
                .windowedBy(UnlimitedWindows.of().startOn(ofEpochMilli(startTime)))
                .count()
                .toStream()
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
index 7127c9f..dda2da4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/JoinWindowsTest.java
@@ -72,12 +72,11 @@ public class JoinWindowsTest {
         }
     }
 
-    @Deprecated
     @Test
-    public void untilShouldSetMaintainDuration() {
+    public void untilShouldSetGraceDuration() {
         final JoinWindows windowSpec = JoinWindows.of(ofMillis(ANY_SIZE));
         final long windowSize = windowSpec.size();
-        assertEquals(windowSize, windowSpec.until(windowSize).maintainMs());
+        assertEquals(windowSize, windowSpec.grace(ofMillis(windowSize)).gracePeriodMs());
     }
 
     @Deprecated
@@ -115,16 +114,16 @@ public class JoinWindowsTest {
 
         verifyEquality(JoinWindows.of(ofMillis(3)).grace(ofMillis(2)), JoinWindows.of(ofMillis(3)).grace(ofMillis(2)));
 
-        verifyEquality(JoinWindows.of(ofMillis(3)).until(60), JoinWindows.of(ofMillis(3)).until(60));
+        verifyEquality(JoinWindows.of(ofMillis(3)).grace(ofMillis(60)), JoinWindows.of(ofMillis(3)).grace(ofMillis(60)));
 
         verifyEquality(
-            JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60),
-            JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60)
+            JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60)),
+            JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60))
         );
         // JoinWindows is a little weird in that before and after set the same fields as of.
         verifyEquality(
-            JoinWindows.of(ofMillis(9)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60),
-            JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60)
+            JoinWindows.of(ofMillis(9)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60)),
+            JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).grace(ofMillis(60))
         );
     }
 
@@ -138,27 +137,22 @@ public class JoinWindowsTest {
 
         verifyInEquality(JoinWindows.of(ofMillis(3)).grace(ofMillis(9)), JoinWindows.of(ofMillis(3)).grace(ofMillis(2)));
 
-        verifyInEquality(JoinWindows.of(ofMillis(3)).until(90), JoinWindows.of(ofMillis(3)).until(60));
-
+        verifyInEquality(JoinWindows.of(ofMillis(3)).grace(ofMillis(90)), JoinWindows.of(ofMillis(3)).grace(ofMillis(60)));
 
-        verifyInEquality(
-            JoinWindows.of(ofMillis(3)).before(ofMillis(9)).after(ofMillis(2)).grace(ofMillis(3)).until(60),
-            JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60)
-        );
 
         verifyInEquality(
-            JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(9)).grace(ofMillis(3)).until(60),
-            JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60)
+            JoinWindows.of(ofMillis(3)).before(ofMillis(9)).after(ofMillis(2)).grace(ofMillis(3)),
+            JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3))
         );
 
         verifyInEquality(
-            JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(9)).until(60),
-            JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60)
+            JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(9)).grace(ofMillis(3)),
+            JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3))
         );
 
         verifyInEquality(
-            JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(90),
-            JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3)).until(60)
+            JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(9)),
+            JoinWindows.of(ofMillis(3)).before(ofMillis(1)).after(ofMillis(2)).grace(ofMillis(3))
         );
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
index 3f43691..929bc3f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.processor.AbstractProcessor;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
@@ -102,8 +103,8 @@ public class RepartitionTopicNamingTest {
         try {
             final StreamsBuilder builder = new StreamsBuilder();
             final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping"));
-            kGroupedStream.windowedBy(TimeWindows.of(10)).count();
-            kGroupedStream.windowedBy(TimeWindows.of(30)).count();
+            kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
+            kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count();
             builder.build();
             fail("Should not build re-using repartition topic name");
         } catch (final TopologyException te) {
@@ -121,8 +122,8 @@ public class RepartitionTopicNamingTest {
             final KStream<String, String> stream2 = builder.<String, String>stream("topic2").selectKey((k, v) -> k);
             final KStream<String, String> stream3 = builder.<String, String>stream("topic3").selectKey((k, v) -> k);
 
-            final KStream<String, String> joined = stream1.join(stream2, (v1, v2) -> v1 + v2, JoinWindows.of(30), Joined.named("join-repartition"));
-            joined.join(stream3, (v1, v2) -> v1 + v2, JoinWindows.of(30), Joined.named("join-repartition"));
+            final KStream<String, String> joined = stream1.join(stream2, (v1, v2) -> v1 + v2, JoinWindows.of(Duration.ofMillis(30L)), Joined.named("join-repartition"));
+            joined.join(stream3, (v1, v2) -> v1 + v2, JoinWindows.of(Duration.ofMillis(30L)), Joined.named("join-repartition"));
             builder.build();
             fail("Should not build re-using repartition topic name");
         } catch (final TopologyException te) {
@@ -137,8 +138,8 @@ public class RepartitionTopicNamingTest {
         final Properties properties = new Properties();
         properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
         final KGroupedStream<String, String> kGroupedStream = builder.<String, String>stream("topic").selectKey((k, v) -> k).groupByKey(Grouped.as("grouping"));
-        kGroupedStream.windowedBy(TimeWindows.of(10)).count();
-        kGroupedStream.windowedBy(TimeWindows.of(30)).count();
+        kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
+        kGroupedStream.windowedBy(TimeWindows.of(Duration.ofMillis(30L))).count();
         builder.build(properties);
     }
 
@@ -272,9 +273,9 @@ public class RepartitionTopicNamingTest {
 
         if (isGroupByKey) {
             if (otherOperations) {
-                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName)).windowedBy(TimeWindows.of(10)).count();
+                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName)).windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
             } else {
-                selectKeyStream.groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName)).windowedBy(TimeWindows.of(10)).count();
+                selectKeyStream.groupByKey(Grouped.as(groupedTimeWindowRepartitionTopicName)).windowedBy(TimeWindows.of(Duration.ofMillis(10L))).count();
             }
         } else {
             if (otherOperations) {
@@ -297,15 +298,15 @@ public class RepartitionTopicNamingTest {
         final String groupedSessionWindowRepartitionTopicName = "session-window-grouping";
         if (isGroupByKey) {
             if (otherOperations) {
-                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(10)).count();
+                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(Duration.ofMillis(10L))).count();
             } else {
-                selectKeyStream.groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(10)).count();
+                selectKeyStream.groupByKey(Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(Duration.ofMillis(10L))).count();
             }
         } else {
             if (otherOperations) {
-                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(10)).count();
+                selectKeyStream.filter((k, v) -> true).mapValues(v -> v).groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(Duration.ofMillis(10L))).count();
             } else {
-                selectKeyStream.groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(10)).count();
+                selectKeyStream.groupBy(kvMapper, Grouped.as(groupedSessionWindowRepartitionTopicName)).windowedBy(SessionWindows.with(Duration.ofMillis(10L))).count();
             }
         }
 
@@ -356,7 +357,7 @@ public class RepartitionTopicNamingTest {
 
         final String joinRepartitionTopicName = "my-join";
         updatedStreamOne.join(updatedStreamTwo, (v1, v2) -> v1 + v2,
-                JoinWindows.of(1000), Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), joinRepartitionTopicName));
+                JoinWindows.of(Duration.ofMillis(1000L)), Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), joinRepartitionTopicName));
 
         return builder.build().describe().toString();
     }
@@ -403,7 +404,7 @@ public class RepartitionTopicNamingTest {
 
         mappedStream.filter((k, v) -> k.equals("A"))
                 .join(countStream, (v1, v2) -> v1 + ":" + v2.toString(),
-                        JoinWindows.of(5000),
+                        JoinWindows.of(Duration.ofMillis(5000L)),
                         Joined.with(Serdes.String(), Serdes.String(), Serdes.Long(), fourthRepartitionTopicName))
                 .to(JOINED_TOPIC);
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
index 369bbad..eac978c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SessionWindowsTest.java
@@ -33,11 +33,10 @@ public class SessionWindowsTest {
         assertEquals(anyGap, SessionWindows.with(ofMillis(anyGap)).inactivityGap());
     }
 
-    @Deprecated
     @Test
-    public void shouldSetWindowRetentionTime() {
+    public void shouldSetWindowGraceTime() {
         final long anyRetentionTime = 42L;
-        assertEquals(anyRetentionTime, SessionWindows.with(ofMillis(1)).until(anyRetentionTime).maintainMs());
+        assertEquals(anyRetentionTime, SessionWindows.with(ofMillis(1)).grace(ofMillis(anyRetentionTime)).gracePeriodMs());
     }
 
 
@@ -88,9 +87,9 @@ public class SessionWindowsTest {
 
         verifyEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(6)), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)));
 
-        verifyEquality(SessionWindows.with(ofMillis(1)).until(7), SessionWindows.with(ofMillis(1)).until(7));
+        verifyEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(7)), SessionWindows.with(ofMillis(1)).grace(ofMillis(7)));
 
-        verifyEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).until(7), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).until(7));
+        verifyEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).grace(ofMillis(7)), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).grace(ofMillis(7)));
     }
 
     @Test
@@ -99,12 +98,10 @@ public class SessionWindowsTest {
 
         verifyInEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(9)), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)));
 
-        verifyInEquality(SessionWindows.with(ofMillis(1)).until(9), SessionWindows.with(ofMillis(1)).until(7));
-
-        verifyInEquality(SessionWindows.with(ofMillis(2)).grace(ofMillis(6)).until(7), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).until(7));
+        verifyInEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(9)), SessionWindows.with(ofMillis(1)).grace(ofMillis(7)));
 
-        verifyInEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(0)).until(7), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).until(7));
+        verifyInEquality(SessionWindows.with(ofMillis(2)).grace(ofMillis(6)).grace(ofMillis(7)), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)));
 
-        verifyInEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).until(70), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)).until(7));
+        verifyInEquality(SessionWindows.with(ofMillis(1)).grace(ofMillis(0)).grace(ofMillis(7)), SessionWindows.with(ofMillis(1)).grace(ofMillis(6)));
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
index 2bdb3a0..dcb691b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/TimeWindowsTest.java
@@ -160,11 +160,11 @@ public class TimeWindowsTest {
 
         verifyEquality(TimeWindows.of(ofMillis(3)).grace(ofMillis(1)), TimeWindows.of(ofMillis(3)).grace(ofMillis(1)));
 
-        verifyEquality(TimeWindows.of(ofMillis(3)).until(4), TimeWindows.of(ofMillis(3)).until(4));
+        verifyEquality(TimeWindows.of(ofMillis(3)).grace(ofMillis(4)), TimeWindows.of(ofMillis(3)).grace(ofMillis(4)));
 
         verifyEquality(
-            TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(1)).until(4),
-            TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(1)).until(4)
+            TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(1)).grace(ofMillis(4)),
+            TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(1)).grace(ofMillis(4))
         );
     }
 
@@ -176,27 +176,22 @@ public class TimeWindowsTest {
 
         verifyInEquality(TimeWindows.of(ofMillis(3)).grace(ofMillis(2)), TimeWindows.of(ofMillis(3)).grace(ofMillis(1)));
 
-        verifyInEquality(TimeWindows.of(ofMillis(3)).until(9), TimeWindows.of(ofMillis(3)).until(4));
+        verifyInEquality(TimeWindows.of(ofMillis(3)).grace(ofMillis(9)), TimeWindows.of(ofMillis(3)).grace(ofMillis(4)));
 
 
         verifyInEquality(
-            TimeWindows.of(ofMillis(4)).advanceBy(ofMillis(2)).grace(ofMillis(2)).until(4),
-            TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2)).until(4)
+            TimeWindows.of(ofMillis(4)).advanceBy(ofMillis(2)).grace(ofMillis(2)),
+            TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2))
         );
 
         verifyInEquality(
-            TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(2)).until(4),
-            TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2)).until(4)
+            TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(1)).grace(ofMillis(2)),
+            TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2))
         );
 
         assertNotEquals(
-            TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(1)).until(4),
-            TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2)).until(4)
-        );
-
-        assertNotEquals(
-            TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2)).until(9),
-            TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2)).until(4)
+            TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(1)),
+            TimeWindows.of(ofMillis(3)).advanceBy(ofMillis(2)).grace(ofMillis(2))
         );
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index 26b7e24..f2fc4f8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
@@ -78,11 +79,10 @@ public class KGroupedStreamImplTest {
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 
-    @SuppressWarnings("deprecation")
     @Before
     public void before() {
         final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
-        groupedStream = stream.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()));
+        groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
 
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index c06cce4..99f1b81 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -204,13 +205,11 @@ public class KGroupedTableImplTest {
         }
     }
 
-    @SuppressWarnings({"unchecked", "deprecation"})
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldCountAndMaterializeResults() {
         final KTable<String, String> table = builder.table(topic, Consumed.with(Serdes.String(), Serdes.String()));
-        table.groupBy(MockMapper.selectValueKeyValueMapper(),
-                      org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(),
-                                      Serdes.String()))
+        table.groupBy(MockMapper.selectValueKeyValueMapper(), Grouped.with(Serdes.String(), Serdes.String()))
                 .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count")
                                .withKeySerde(Serdes.String())
                                .withValueSerde(Serdes.Long()));
@@ -223,13 +222,11 @@ public class KGroupedTableImplTest {
         }
     }
 
-    @SuppressWarnings({"unchecked", "deprecation"})
+    @SuppressWarnings("unchecked")
     @Test
     public void shouldAggregateAndMaterializeResults() {
         final KTable<String, String> table = builder.table(topic, Consumed.with(Serdes.String(), Serdes.String()));
-        table.groupBy(MockMapper.<String, String>selectValueKeyValueMapper(),
-                      org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(),
-                                      Serdes.String()))
+        table.groupBy(MockMapper.<String, String>selectValueKeyValueMapper(), Grouped.with(Serdes.String(), Serdes.String()))
                 .aggregate(MockInitializer.STRING_INIT,
                            MockAggregator.TOSTRING_ADDER,
                            MockAggregator.TOSTRING_REMOVER,
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 1f3492c..772836f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.GlobalKTable;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
@@ -54,6 +55,7 @@ import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -188,7 +190,6 @@ public class KStreamImplTest {
             TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build(null).processors().size());
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldPreserveSerdesForOperators() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -264,28 +265,28 @@ public class KStreamImplTest {
 
         assertEquals(((AbstractStream) stream1.groupByKey()).keySerde(), consumedInternal.keySerde());
         assertEquals(((AbstractStream) stream1.groupByKey()).valueSerde(), consumedInternal.valueSerde());
-        assertEquals(((AbstractStream) stream1.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(mySerde, mySerde))).keySerde(), mySerde);
-        assertEquals(((AbstractStream) stream1.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+        assertEquals(((AbstractStream) stream1.groupByKey(Grouped.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) stream1.groupByKey(Grouped.with(mySerde, mySerde))).valueSerde(), mySerde);
 
         assertEquals(((AbstractStream) stream1.groupBy(selector)).keySerde(), null);
         assertEquals(((AbstractStream) stream1.groupBy(selector)).valueSerde(), consumedInternal.valueSerde());
-        assertEquals(((AbstractStream) stream1.groupBy(selector, org.apache.kafka.streams.kstream.Serialized.with(mySerde, mySerde))).keySerde(), mySerde);
-        assertEquals(((AbstractStream) stream1.groupBy(selector, org.apache.kafka.streams.kstream.Serialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+        assertEquals(((AbstractStream) stream1.groupBy(selector, Grouped.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) stream1.groupBy(selector, Grouped.with(mySerde, mySerde))).valueSerde(), mySerde);
 
-        assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L))).keySerde(), null);
-        assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L))).valueSerde(), null);
-        assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
-        assertNull(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).valueSerde());
+        assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)))).keySerde(), null);
+        assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)))).valueSerde(), null);
+        assertEquals(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
+        assertNull(((AbstractStream) stream1.join(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(mySerde, mySerde, mySerde))).valueSerde());
 
-        assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L))).keySerde(), null);
-        assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L))).valueSerde(), null);
-        assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
-        assertNull(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).valueSerde());
+        assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)))).keySerde(), null);
+        assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)))).valueSerde(), null);
+        assertEquals(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
+        assertNull(((AbstractStream) stream1.leftJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(mySerde, mySerde, mySerde))).valueSerde());
 
-        assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L))).keySerde(), null);
-        assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L))).valueSerde(), null);
-        assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
-        assertNull(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(100L), Joined.with(mySerde, mySerde, mySerde))).valueSerde());
+        assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)))).keySerde(), null);
+        assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)))).valueSerde(), null);
+        assertEquals(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(mySerde, mySerde, mySerde))).keySerde(), mySerde);
+        assertNull(((AbstractStream) stream1.outerJoin(stream1, joiner, JoinWindows.of(Duration.ofMillis(100L)), Joined.with(mySerde, mySerde, mySerde))).valueSerde());
 
         assertEquals(((AbstractStream) stream1.join(table1, joiner)).keySerde(), consumedInternal.keySerde());
         assertEquals(((AbstractStream) stream1.join(table1, joiner)).valueSerde(), null);
@@ -384,7 +385,7 @@ public class KStreamImplTest {
             });
         stream.join(kStream,
                     valueJoiner,
-                    JoinWindows.of(ofMillis(windowSize)).until(3 * windowSize),
+                    JoinWindows.of(ofMillis(windowSize)).grace(ofMillis(3 * windowSize)),
                     Joined.with(Serdes.String(),
                                 Serdes.String(),
                                 Serdes.String()))
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index b82c8fe..068062b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -260,7 +260,7 @@ public class KStreamWindowAggregateTest {
         final String topic = "topic";
 
         final KStream<String, String> stream1 = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
-        stream1.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()))
+        stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
                .windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).until(100))
                .aggregate(
                    () -> "",
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
index 790c563..634cb2f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduceTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
@@ -51,14 +52,13 @@ public class KStreamWindowReduceTest {
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldLogAndMeterOnNullKey() {
 
         final StreamsBuilder builder = new StreamsBuilder();
         builder
             .stream("TOPIC", Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
             .windowedBy(TimeWindows.of(ofMillis(500L)))
             .reduce((value1, value2) -> value1 + "+" + value2);
 
@@ -80,7 +80,7 @@ public class KStreamWindowReduceTest {
         final StreamsBuilder builder = new StreamsBuilder();
         builder
             .stream("TOPIC", Consumed.with(Serdes.String(), Serdes.String()))
-            .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
             .windowedBy(TimeWindows.of(ofMillis(5L)).until(100))
             .reduce((value1, value2) -> value1 + "+" + value2)
             .toStream()
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 01b2609..0452c06 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -55,8 +56,7 @@ public class KTableAggregateTest {
 
     private final Serde<String> stringSerde = Serdes.String();
     private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
-    private final org.apache.kafka.streams.kstream.Serialized<String, String> stringSerialzied =
-            org.apache.kafka.streams.kstream.Serialized.with(stringSerde, stringSerde);
+    private final Grouped<String, String> stringSerialzied = Grouped.with(stringSerde, stringSerde);
     private final MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier<>();
 
     private File stateDir = null;
@@ -362,7 +362,7 @@ public class KTableAggregateTest {
             public KeyValue<String, Long> apply(final Long key, final String value) {
                 return new KeyValue<>(value, key);
             }
-        }, org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.Long()))
+        }, Grouped.with(Serdes.String(), Serdes.Long()))
                 .reduce(new Reducer<Long>() {
                     @Override
                     public Long apply(final Long value1, final Long value2) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 5747fed..d0ed50b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyTestDriverWrapper;
 import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -135,7 +136,6 @@ public class KTableImplTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
     public void shouldPreserveSerdesForOperators() {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<String, String> table1 = builder.table("topic-2", stringConsumed);
@@ -184,8 +184,8 @@ public class KTableImplTest {
 
         assertEquals(((AbstractStream) table1.groupBy(KeyValue::new)).keySerde(), null);
         assertEquals(((AbstractStream) table1.groupBy(KeyValue::new)).valueSerde(), null);
-        assertEquals(((AbstractStream) table1.groupBy(KeyValue::new, org.apache.kafka.streams.kstream.Serialized.with(mySerde, mySerde))).keySerde(), mySerde);
-        assertEquals(((AbstractStream) table1.groupBy(KeyValue::new, org.apache.kafka.streams.kstream.Serialized.with(mySerde, mySerde))).valueSerde(), mySerde);
+        assertEquals(((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).keySerde(), mySerde);
+        assertEquals(((AbstractStream) table1.groupBy(KeyValue::new, Grouped.with(mySerde, mySerde))).valueSerde(), mySerde);
 
         assertEquals(((AbstractStream) table1.join(table1, joiner)).keySerde(), consumedInternal.keySerde());
         assertEquals(((AbstractStream) table1.join(table1, joiner)).valueSerde(), null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 1cd360a..f5d74b2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -18,19 +18,17 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
@@ -344,7 +342,6 @@ public class KTableKTableLeftJoinTest {
      * It is based on a fairly complicated join used by the developer that reported the bug.
      * Before the fix this would trigger an IllegalStateException.
      */
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldNotThrowIllegalStateExceptionWhenMultiCacheEvictions() {
         final String agg = "agg";
@@ -360,16 +357,8 @@ public class KTableKTableLeftJoinTest {
         final Consumed<Long, String> consumed = Consumed.with(Serdes.Long(), Serdes.String());
         final KTable<Long, String> aggTable = builder
             .table(agg, consumed)
-            .groupBy(
-                new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
-                    @Override
-                    public KeyValue<Long, String> apply(final Long key, final String value) {
-                        return new KeyValue<>(key, value);
-                    }
-                },
-                org.apache.kafka.streams.kstream.Serialized.with(Serdes.Long(), Serdes.String())
-            )
-            .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_ADDER, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("agg-store"));
+            .groupBy(KeyValue::new, Grouped.with(Serdes.Long(), Serdes.String()))
+            .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_ADDER, Materialized.as("agg-store"));
 
         final KTable<Long, String> one = builder.table(tableOne, consumed);
         final KTable<Long, String> two = builder.table(tableTwo, consumed);
@@ -378,12 +367,8 @@ public class KTableKTableLeftJoinTest {
         final KTable<Long, String> five = builder.table(tableFive, consumed);
         final KTable<Long, String> six = builder.table(tableSix, consumed);
 
-        final ValueMapper<String, String> mapper = new ValueMapper<String, String>() {
-            @Override
-            public String apply(final String value) {
-                return value.toUpperCase(Locale.ROOT);
-            }
-        };
+        final ValueMapper<String, String> mapper = value -> value.toUpperCase(Locale.ROOT);
+
         final KTable<Long, String> seven = one.mapValues(mapper);
 
         final KTable<Long, String> eight = six.leftJoin(seven, MockValueJoiner.TOSTRING_JOINER);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
index 82ada52..350b0d2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.ValueMapper;
@@ -359,7 +360,6 @@ public class KTableTransformValuesTest {
         assertThat(keyValueStore.get("C"), is("C->null!"));
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldCalculateCorrectOldValuesIfMaterializedEvenIfStateful() {
         builder
@@ -369,7 +369,7 @@ public class KTableTransformValuesTest {
                 Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(QUERYABLE_NAME)
                     .withKeySerde(Serdes.String())
                     .withValueSerde(Serdes.Integer()))
-            .groupBy(toForceSendingOfOldValues(), org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.Integer()))
+            .groupBy(toForceSendingOfOldValues(), Grouped.with(Serdes.String(), Serdes.Integer()))
             .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR)
             .mapValues(mapBackToStrings())
             .toStream()
@@ -387,13 +387,12 @@ public class KTableTransformValuesTest {
         assertThat(keyValueStore.get("A"), is(3));
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldCalculateCorrectOldValuesIfNotStatefulEvenIfNotMaterialized() {
         builder
             .table(INPUT_TOPIC, CONSUMED)
             .transformValues(new StatelessTransformerSupplier())
-            .groupBy(toForceSendingOfOldValues(), org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.Integer()))
+            .groupBy(toForceSendingOfOldValues(), Grouped.with(Serdes.String(), Serdes.Integer()))
             .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR)
             .mapValues(mapBackToStrings())
             .toStream()
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
index 749d5d6..57d7b14 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Merger;
@@ -65,11 +66,10 @@ public class SessionWindowedKStreamImplTest {
     };
     private SessionWindowedKStream<String, String> stream;
 
-    @SuppressWarnings("deprecation")
     @Before
     public void before() {
         final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
-        this.stream = stream.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()))
+        this.stream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
                 .windowedBy(SessionWindows.with(ofMillis(500)));
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index 57b860e..b0b87be 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -359,7 +359,7 @@ public class SuppressScenarioTest {
         final KTable<Windowed<String>, Long> valueCounts = builder
             .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
             .groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE))
-            .windowedBy(TimeWindows.of(2L).grace(ofMillis(1L)))
+            .windowedBy(TimeWindows.of(ofMillis(2L)).grace(ofMillis(1L)))
             .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts").withCachingDisabled());
         valueCounts
             .suppress(untilWindowCloses(unbounded()))
@@ -410,7 +410,7 @@ public class SuppressScenarioTest {
         final KTable<Windowed<String>, Long> valueCounts = builder
             .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
             .groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE))
-            .windowedBy(TimeWindows.of(2L).grace(ofMillis(2L)))
+            .windowedBy(TimeWindows.of(ofMillis(2L)).grace(ofMillis(2L)))
             .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts").withCachingDisabled().withKeySerde(STRING_SERDE));
         valueCounts
             .suppress(untilWindowCloses(unbounded()))
@@ -459,14 +459,13 @@ public class SuppressScenarioTest {
         }
     }
 
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldSupportFinalResultsForSessionWindows() {
         final StreamsBuilder builder = new StreamsBuilder();
         final KTable<Windowed<String>, Long> valueCounts = builder
             .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
             .groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE))
-            .windowedBy(SessionWindows.with(5L).grace(ofMillis(5L)))
+            .windowedBy(SessionWindows.with(ofMillis(5L)).grace(ofMillis(5L)))
             .count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("counts").withCachingDisabled());
         valueCounts
             .suppress(untilWindowCloses(unbounded()))
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
index c67b9e6..ce0f25e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.TimeWindowedKStream;
@@ -58,11 +59,10 @@ public class TimeWindowedKStreamImplTest {
     private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
     private TimeWindowedKStream<String, String> windowedStream;
 
-    @SuppressWarnings("deprecation")
     @Before
     public void before() {
         final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
-        windowedStream = stream.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()))
+        windowedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
                 .windowedBy(TimeWindows.of(ofMillis(500L)));
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
index de1a621..ec7f2fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -251,7 +252,6 @@ public class YahooBenchmark {
         }
     }
 
-    @SuppressWarnings("deprecation")
     private KafkaStreams createYahooBenchmarkStreams(final Properties streamConfig, final String campaignsTopic, final String eventsTopic,
                                                      final CountDownLatch latch, final int numRecords) {
         final Map<String, Object> serdeProps = new HashMap<>();
@@ -333,7 +333,7 @@ public class YahooBenchmark {
 
         // calculate windowed counts
         keyedByCampaign
-            .groupByKey(org.apache.kafka.streams.kstream.Serialized.with(Serdes.String(), Serdes.String()))
+            .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
             .windowedBy(TimeWindows.of(Duration.ofMillis(10 * 1000)))
             .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windows"));
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
index 623c3e3..47f74d8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/BrokerCompatibilityTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.ValueMapper;
 
 import java.io.IOException;
@@ -46,7 +47,6 @@ public class BrokerCompatibilityTest {
     private static final String SOURCE_TOPIC = "brokerCompatibilitySourceTopic";
     private static final String SINK_TOPIC = "brokerCompatibilitySinkTopic";
 
-    @SuppressWarnings("deprecation")
     public static void main(final String[] args) throws IOException {
         if (args.length < 2) {
             System.err.println("BrokerCompatibilityTest are expecting two parameters: propFile, eosEnabled; but only see " + args.length + " parameter");
@@ -83,7 +83,7 @@ public class BrokerCompatibilityTest {
 
 
         final StreamsBuilder builder = new StreamsBuilder();
-        builder.<String, String>stream(SOURCE_TOPIC).groupByKey(org.apache.kafka.streams.kstream.Serialized.with(stringSerde, stringSerde))
+        builder.<String, String>stream(SOURCE_TOPIC).groupByKey(Grouped.with(stringSerde, stringSerde))
             .count()
             .toStream()
             .mapValues(new ValueMapper<Long, String>() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index ef908f4..a396ad1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
@@ -110,7 +111,6 @@ public class SmokeTestClient extends SmokeTestUtil {
         return fullProps;
     }
 
-    @SuppressWarnings("deprecation")
     private static KafkaStreams createKafkaStreams(final Properties props) {
         final StreamsBuilder builder = new StreamsBuilder();
         final Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
@@ -125,8 +125,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         data.process(SmokeTestUtil.printProcessorSupplier("data"));
 
         // min
-        final KGroupedStream<String, Integer> groupedData =
-            data.groupByKey(org.apache.kafka.streams.kstream.Serialized.with(stringSerde, intSerde));
+        final KGroupedStream<String, Integer> groupedData = data.groupByKey(Grouped.with(stringSerde, intSerde));
 
         groupedData
             .windowedBy(TimeWindows.of(Duration.ofDays(1)))
@@ -239,7 +238,7 @@ public class SmokeTestClient extends SmokeTestUtil {
 
         // test repartition
         final Agg agg = new Agg();
-        cntTable.groupBy(agg.selector(), org.apache.kafka.streams.kstream.Serialized.with(stringSerde, longSerde))
+        cntTable.groupBy(agg.selector(), Grouped.with(stringSerde, longSerde))
             .aggregate(agg.init(), agg.adder(), agg.remover(),
                 Materialized.<String, Long>as(Stores.inMemoryKeyValueStore("cntByCnt"))
                     .withKeySerde(Serdes.String())