You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/03/17 23:42:04 UTC

[2/4] kafka git commit: KAFKA-3336: Unify Serializer and Deserializer into Serialization

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index 6823e6d..00089ab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -20,7 +20,7 @@ package org.apache.kafka.streams.state.internals;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.StateSerdes;
 
 /**
  * A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database.
@@ -36,10 +36,10 @@ public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier {
     private final long retentionPeriod;
     private final boolean retainDuplicates;
     private final int numSegments;
-    private final Serdes<K, V> serdes;
+    private final StateSerdes<K, V> serdes;
     private final Time time;
 
-    public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes, Time time) {
+    public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, StateSerdes<K, V> serdes, Time time) {
         this.name = name;
         this.retentionPeriod = retentionPeriod;
         this.retainDuplicates = retainDuplicates;

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index 4229f94..a439117 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
 import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.StateSerdes;
 
 import java.util.HashSet;
 import java.util.Set;
@@ -36,7 +36,7 @@ public class StoreChangeLogger<K, V> {
     // TODO: these values should be configurable
     protected static final int DEFAULT_WRITE_BATCH_SIZE = 100;
 
-    protected final Serdes<K, V> serialization;
+    protected final StateSerdes<K, V> serialization;
 
     private final String topic;
     private final int partition;
@@ -47,16 +47,16 @@ public class StoreChangeLogger<K, V> {
     protected Set<K> dirty;
     protected Set<K> removed;
 
-    public StoreChangeLogger(String storeName, ProcessorContext context, Serdes<K, V> serialization) {
+    public StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization) {
         this(storeName, context, serialization, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
     }
 
-    public StoreChangeLogger(String storeName, ProcessorContext context, Serdes<K, V> serialization, int maxDirty, int maxRemoved) {
+    public StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization, int maxDirty, int maxRemoved) {
         this(storeName, context, context.taskId().partition, serialization, maxDirty, maxRemoved);
         init();
     }
 
-    protected StoreChangeLogger(String storeName, ProcessorContext context, int partition, Serdes<K, V> serialization, int maxDirty, int maxRemoved) {
+    protected StoreChangeLogger(String storeName, ProcessorContext context, int partition, StateSerdes<K, V> serialization, int maxDirty, int maxRemoved) {
         this.topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
         this.context = context;
         this.partition = partition;

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 83ebe48..0dacde7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -18,11 +18,6 @@
 package org.apache.kafka.streams;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.junit.Before;
 import org.junit.Test;
@@ -43,11 +38,6 @@ public class StreamsConfigTest {
     public void setUp() {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-config-test");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
-        props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
-        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
         streamsConfig = new StreamsConfig(props);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index 88366fa..e04a273 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -17,8 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.Predicate;
@@ -34,9 +33,6 @@ public class KStreamBranchTest {
 
     private String topicName = "topic";
 
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private StringDeserializer valDeserializer = new StringDeserializer();
-
     @SuppressWarnings("unchecked")
     @Test
     public void testKStreamBranch() {
@@ -67,7 +63,7 @@ public class KStreamBranchTest {
         KStream<Integer, String>[] branches;
         MockProcessorSupplier<Integer, String>[] processors;
 
-        stream = builder.stream(keyDeserializer, valDeserializer, topicName);
+        stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
         branches = stream.branch(isEven, isMultipleOfThree, isOdd);
 
         assertEquals(3, branches.length);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index 3bad041..ecf1115 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -17,8 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.Predicate;
@@ -33,9 +32,6 @@ public class KStreamFilterTest {
 
     private String topicName = "topic";
 
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private StringDeserializer valDeserializer = new StringDeserializer();
-
     private Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
         @Override
         public boolean test(Integer key, String value) {
@@ -52,7 +48,7 @@ public class KStreamFilterTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.stream(keyDeserializer, valDeserializer, topicName);
+        stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
         stream.filter(isMultipleOfThree).process(processor);
 
         KStreamTestDriver driver = new KStreamTestDriver(builder);
@@ -72,7 +68,7 @@ public class KStreamFilterTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.stream(keyDeserializer, valDeserializer, topicName);
+        stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
         stream.filterOut(isMultipleOfThree).process(processor);
 
         KStreamTestDriver driver = new KStreamTestDriver(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index 693f58e..bc85757 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -17,8 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.KeyValue;
@@ -35,9 +34,6 @@ public class KStreamFlatMapTest {
 
     private String topicName = "topic";
 
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private StringDeserializer valDeserializer = new StringDeserializer();
-
     @Test
     public void testFlatMap() {
         KStreamBuilder builder = new KStreamBuilder();
@@ -60,7 +56,7 @@ public class KStreamFlatMapTest {
         MockProcessorSupplier<String, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.stream(keyDeserializer, valDeserializer, topicName);
+        stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
         stream.flatMap(mapper).process(processor);
 
         KStreamTestDriver driver = new KStreamTestDriver(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index eef7933..a904cb1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -17,8 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueMapper;
@@ -34,9 +33,6 @@ public class KStreamFlatMapValuesTest {
 
     private String topicName = "topic";
 
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private StringDeserializer valDeserializer = new StringDeserializer();
-
     @Test
     public void testFlatMapValues() {
         KStreamBuilder builder = new KStreamBuilder();
@@ -58,7 +54,7 @@ public class KStreamFlatMapValuesTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.stream(keyDeserializer, valDeserializer, topicName);
+        stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
         stream.flatMapValues(mapper).process(processor);
 
         KStreamTestDriver driver = new KStreamTestDriver(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 3d3a9e3..38182bc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -17,12 +17,8 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -39,18 +35,16 @@ import static org.junit.Assert.assertEquals;
 
 public class KStreamImplTest {
 
+    final private Serde<String> stringSerde = Serdes.String();
+    final private Serde<Integer> intSerde = Serdes.Integer();
+
     @Test
     public void testNumProcesses() {
-        final Serializer<String> stringSerializer = new StringSerializer();
-        final Deserializer<String> stringDeserializer = new StringDeserializer();
-        final Serializer<Integer> integerSerializer = new IntegerSerializer();
-        final Deserializer<Integer> integerDeserializer = new IntegerDeserializer();
-
         final KStreamBuilder builder = new KStreamBuilder();
 
-        KStream<String, String> source1 = builder.stream(stringDeserializer, stringDeserializer, "topic-1", "topic-2");
+        KStream<String, String> source1 = builder.stream(stringSerde, stringSerde, "topic-1", "topic-2");
 
-        KStream<String, String> source2 = builder.stream(stringDeserializer, stringDeserializer, "topic-3", "topic-4");
+        KStream<String, String> source2 = builder.stream(stringSerde, stringSerde, "topic-3", "topic-4");
 
         KStream<String, String> stream1 =
             source1.filter(new Predicate<String, String>() {
@@ -114,14 +108,14 @@ public class KStreamImplTest {
             public Integer apply(Integer value1, Integer value2) {
                 return value1 + value2;
             }
-        }, JoinWindows.of("join-0"), stringSerializer, integerSerializer, integerSerializer, stringDeserializer, integerDeserializer, integerDeserializer);
+        }, JoinWindows.of("join-0"), stringSerde, intSerde, intSerde);
 
         KStream<String, Integer> stream5 = streams2[1].join(streams3[1], new ValueJoiner<Integer, Integer, Integer>() {
             @Override
             public Integer apply(Integer value1, Integer value2) {
                 return value1 + value2;
             }
-        }, JoinWindows.of("join-1"), stringSerializer, integerSerializer, integerSerializer, stringDeserializer, integerDeserializer, integerDeserializer);
+        }, JoinWindows.of("join-1"), stringSerde, intSerde, intSerde);
 
         stream4.to("topic-5");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index e763fd2..d24ab15 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
@@ -44,10 +42,8 @@ public class KStreamKStreamJoinTest {
     private String topic1 = "topic1";
     private String topic2 = "topic2";
 
-    private IntegerSerializer keySerializer = new IntegerSerializer();
-    private StringSerializer valSerializer = new StringSerializer();
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private StringDeserializer valDeserializer = new StringDeserializer();
+    final private Serde<Integer> intSerde = Serdes.Integer();
+    final private Serde<String> stringSerde = Serdes.String();
 
     private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
         @Override
@@ -71,10 +67,9 @@ public class KStreamKStreamJoinTest {
             MockProcessorSupplier<Integer, String> processor;
 
             processor = new MockProcessorSupplier<>();
-            stream1 = builder.stream(keyDeserializer, valDeserializer, topic1);
-            stream2 = builder.stream(keyDeserializer, valDeserializer, topic2);
-            joined = stream1.join(stream2, joiner, JoinWindows.of("test").within(100),
-                    keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer);
+            stream1 = builder.stream(intSerde, stringSerde, topic1);
+            stream2 = builder.stream(intSerde, stringSerde, topic2);
+            joined = stream1.join(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde);
             joined.process(processor);
 
             Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -177,10 +172,9 @@ public class KStreamKStreamJoinTest {
             MockProcessorSupplier<Integer, String> processor;
 
             processor = new MockProcessorSupplier<>();
-            stream1 = builder.stream(keyDeserializer, valDeserializer, topic1);
-            stream2 = builder.stream(keyDeserializer, valDeserializer, topic2);
-            joined = stream1.outerJoin(stream2, joiner, JoinWindows.of("test").within(100),
-                    keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer);
+            stream1 = builder.stream(intSerde, stringSerde, topic1);
+            stream2 = builder.stream(intSerde, stringSerde, topic2);
+            joined = stream1.outerJoin(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde);
             joined.process(processor);
 
             Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -285,10 +279,9 @@ public class KStreamKStreamJoinTest {
             MockProcessorSupplier<Integer, String> processor;
 
             processor = new MockProcessorSupplier<>();
-            stream1 = builder.stream(keyDeserializer, valDeserializer, topic1);
-            stream2 = builder.stream(keyDeserializer, valDeserializer, topic2);
-            joined = stream1.join(stream2, joiner, JoinWindows.of("test").within(100),
-                    keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer);
+            stream1 = builder.stream(intSerde, stringSerde, topic1);
+            stream2 = builder.stream(intSerde, stringSerde, topic2);
+            joined = stream1.join(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde);
             joined.process(processor);
 
             Collection<Set<String>> copartitionGroups = builder.copartitionGroups();

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 8c6e43b..166e8ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
@@ -44,10 +42,8 @@ public class KStreamKStreamLeftJoinTest {
     private String topic1 = "topic1";
     private String topic2 = "topic2";
 
-    private IntegerSerializer keySerializer = new IntegerSerializer();
-    private StringSerializer valSerializer = new StringSerializer();
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private StringDeserializer valDeserializer = new StringDeserializer();
+    final private Serde<Integer> intSerde = Serdes.Integer();
+    final private Serde<String> stringSerde = Serdes.String();
 
     private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
         @Override
@@ -71,10 +67,9 @@ public class KStreamKStreamLeftJoinTest {
             MockProcessorSupplier<Integer, String> processor;
 
             processor = new MockProcessorSupplier<>();
-            stream1 = builder.stream(keyDeserializer, valDeserializer, topic1);
-            stream2 = builder.stream(keyDeserializer, valDeserializer, topic2);
-            joined = stream1.leftJoin(stream2, joiner, JoinWindows.of("test").within(100),
-                    keySerializer, valSerializer, keyDeserializer, valDeserializer);
+            stream1 = builder.stream(intSerde, stringSerde, topic1);
+            stream2 = builder.stream(intSerde, stringSerde, topic2);
+            joined = stream1.leftJoin(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde);
             joined.process(processor);
 
             Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -157,10 +152,9 @@ public class KStreamKStreamLeftJoinTest {
             MockProcessorSupplier<Integer, String> processor;
 
             processor = new MockProcessorSupplier<>();
-            stream1 = builder.stream(keyDeserializer, valDeserializer, topic1);
-            stream2 = builder.stream(keyDeserializer, valDeserializer, topic2);
-            joined = stream1.leftJoin(stream2, joiner, JoinWindows.of("test").within(100),
-                    keySerializer, valSerializer, keyDeserializer, valDeserializer);
+            stream1 = builder.stream(intSerde, stringSerde, topic1);
+            stream2 = builder.stream(intSerde, stringSerde, topic2);
+            joined = stream1.leftJoin(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde);
             joined.process(processor);
 
             Collection<Set<String>> copartitionGroups = builder.copartitionGroups();

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index f226cee..8e672a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -18,10 +18,8 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -47,10 +45,8 @@ public class KStreamKTableLeftJoinTest {
     private String topic1 = "topic1";
     private String topic2 = "topic2";
 
-    private IntegerSerializer keySerializer = new IntegerSerializer();
-    private StringSerializer valSerializer = new StringSerializer();
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private StringDeserializer valDeserializer = new StringDeserializer();
+    final private Serde<Integer> intSerde = new Serdes.IntegerSerde();
+    final private Serde<String> stringSerde = new Serdes.StringSerde();
 
     private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
         @Override
@@ -81,8 +77,8 @@ public class KStreamKTableLeftJoinTest {
             MockProcessorSupplier<Integer, String> processor;
 
             processor = new MockProcessorSupplier<>();
-            stream = builder.stream(keyDeserializer, valDeserializer, topic1);
-            table = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+            stream = builder.stream(intSerde, stringSerde, topic1);
+            table = builder.table(intSerde, stringSerde, topic2);
             stream.leftJoin(table, joiner).process(processor);
 
             Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -162,8 +158,8 @@ public class KStreamKTableLeftJoinTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.stream(keyDeserializer, valDeserializer, topic1).map(keyValueMapper);
-        table = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+        stream = builder.stream(intSerde, stringSerde, topic1).map(keyValueMapper);
+        table = builder.table(intSerde, stringSerde, topic2);
 
         stream.leftJoin(table, joiner).process(processor);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index 73c517b..68fa656 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.KeyValue;
@@ -33,8 +33,8 @@ public class KStreamMapTest {
 
     private String topicName = "topic";
 
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private StringDeserializer valDeserializer = new StringDeserializer();
+    final private Serde<Integer> intSerde = Serdes.Integer();
+    final private Serde<String> stringSerde = Serdes.String();
 
     @Test
     public void testMap() {
@@ -50,11 +50,10 @@ public class KStreamMapTest {
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-        KStream<Integer, String> stream;
+        KStream<Integer, String> stream = builder.stream(intSerde, stringSerde, topicName);
         MockProcessorSupplier<String, Integer> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.stream(keyDeserializer, valDeserializer, topicName);
         stream.map(mapper).process(processor);
 
         KStreamTestDriver driver = new KStreamTestDriver(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index 68fd285..e671aab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueMapper;
@@ -32,8 +32,8 @@ public class KStreamMapValuesTest {
 
     private String topicName = "topic";
 
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private StringDeserializer valDeserializer = new StringDeserializer();
+    final private Serde<Integer> intSerde = Serdes.Integer();
+    final private Serde<String> stringSerde = Serdes.String();
 
     @Test
     public void testFlatMapValues() {
@@ -51,7 +51,7 @@ public class KStreamMapValuesTest {
 
         KStream<Integer, String> stream;
         MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
-        stream = builder.stream(keyDeserializer, valDeserializer, topicName);
+        stream = builder.stream(intSerde, stringSerde, topicName);
         stream.mapValues(mapper).process(processor);
 
         KStreamTestDriver driver = new KStreamTestDriver(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 426259f..4244de5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -17,7 +17,8 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.KeyValue;
@@ -34,8 +35,7 @@ public class KStreamTransformTest {
 
     private String topicName = "topic";
 
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private IntegerDeserializer valDeserializer = new IntegerDeserializer();
+    final private Serde<Integer> intSerde = Serdes.Integer();
 
     @Test
     public void testTransform() {
@@ -71,9 +71,8 @@ public class KStreamTransformTest {
 
         final int[] expectedKeys = {1, 10, 100, 1000};
 
-        KStream<Integer, Integer> stream;
         MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
-        stream = builder.stream(keyDeserializer, valDeserializer, topicName);
+        KStream<Integer, Integer> stream = builder.stream(intSerde, intSerde, topicName);
         stream.transform(transformerSupplier).process(processor);
 
         KStreamTestDriver driver = new KStreamTestDriver(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 7def9db..52abdf7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -17,7 +17,8 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueTransformer;
@@ -33,8 +34,7 @@ public class KStreamTransformValuesTest {
 
     private String topicName = "topic";
 
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private IntegerDeserializer valDeserializer = new IntegerDeserializer();
+    final private Serde<Integer> intSerde = Serdes.Integer();
 
     @Test
     public void testTransform() {
@@ -72,7 +72,7 @@ public class KStreamTransformValuesTest {
 
         KStream<Integer, Integer> stream;
         MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
-        stream = builder.stream(keyDeserializer, valDeserializer, topicName);
+        stream = builder.stream(intSerde, intSerde, topicName);
         stream.transformValues(valueTransformerSupplier).process(processor);
 
         KStreamTestDriver driver = new KStreamTestDriver(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 9e0745a..e19510f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.HoppingWindows;
@@ -41,8 +39,7 @@ import static org.junit.Assert.assertEquals;
 
 public class KStreamWindowAggregateTest {
 
-    private final Serializer<String> strSerializer = new StringSerializer();
-    private final Deserializer<String> strDeserializer = new StringDeserializer();
+    final private Serde<String> strSerde = new Serdes.StringSerde();
 
     private class StringAdd implements Aggregator<String, String, String> {
 
@@ -68,13 +65,11 @@ public class KStreamWindowAggregateTest {
             final KStreamBuilder builder = new KStreamBuilder();
             String topic1 = "topic1";
 
-            KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1);
+            KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
             KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringInit(), new StringAdd(),
                     HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
-                    strSerializer,
-                    strSerializer,
-                    strDeserializer,
-                    strDeserializer);
+                    strSerde,
+                    strSerde);
 
             MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
             table2.toStream().process(proc2);
@@ -147,24 +142,20 @@ public class KStreamWindowAggregateTest {
             String topic1 = "topic1";
             String topic2 = "topic2";
 
-            KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1);
+            KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
             KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(new StringInit(), new StringAdd(),
                     HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
-                    strSerializer,
-                    strSerializer,
-                    strDeserializer,
-                    strDeserializer);
+                    strSerde,
+                    strSerde);
 
             MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
             table1.toStream().process(proc1);
 
-            KStream<String, String> stream2 = builder.stream(strDeserializer, strDeserializer, topic2);
+            KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2);
             KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(new StringInit(), new StringAdd(),
                     HoppingWindows.of("topic2-Canonized").with(10L).every(5L),
-                    strSerializer,
-                    strSerializer,
-                    strDeserializer,
-                    strDeserializer);
+                    strSerde,
+                    strSerde);
 
             MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
             table2.toStream().process(proc2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index ec85ed7..fc01e5e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
@@ -38,8 +36,7 @@ import static org.junit.Assert.assertEquals;
 
 public class KTableAggregateTest {
 
-    private final Serializer<String> strSerializer = new StringSerializer();
-    private final Deserializer<String> strDeserializer = new StringDeserializer();
+    final private Serde<String> stringSerde = new Serdes.StringSerde();
 
     private class StringAdd implements Aggregator<String, String, String> {
 
@@ -74,15 +71,12 @@ public class KTableAggregateTest {
             final KStreamBuilder builder = new KStreamBuilder();
             String topic1 = "topic1";
 
-            KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
-            KTable<String, String> table2 = table1.<String, String, String>aggregate(new StringInit(), new StringAdd(), new StringRemove(),
+            KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
+            KTable<String, String> table2 = table1.aggregate(new StringInit(), new StringAdd(), new StringRemove(),
                     new NoOpKeyValueMapper<String, String>(),
-                    strSerializer,
-                    strSerializer,
-                    strSerializer,
-                    strDeserializer,
-                    strDeserializer,
-                    strDeserializer,
+                    stringSerde,
+                    stringSerde,
+                    stringSerde,
                     "topic1-Canonized");
 
             MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index c43bea0..5491ea3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -17,12 +17,8 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
@@ -40,10 +36,8 @@ import static org.junit.Assert.assertNull;
 
 public class KTableFilterTest {
 
-    private final Serializer<String> strSerializer = new StringSerializer();
-    private final Deserializer<String> strDeserializer = new StringDeserializer();
-    private final Serializer<Integer> intSerializer = new IntegerSerializer();
-    private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
+    final private Serde<Integer> intSerde = new Serdes.IntegerSerde();
+    final private Serde<String> stringSerde = new Serdes.StringSerde();
 
     @Test
     public void testKTable() {
@@ -51,7 +45,7 @@ public class KTableFilterTest {
 
         String topic1 = "topic1";
 
-        KTable<String, Integer> table1 = builder.table(strSerializer, intSerializer, strDeserializer, intDeserializer, topic1);
+        KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1);
 
         KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() {
             @Override
@@ -93,7 +87,7 @@ public class KTableFilterTest {
             String topic1 = "topic1";
 
             KTableImpl<String, Integer, Integer> table1 =
-                    (KTableImpl<String, Integer, Integer>) builder.table(strSerializer, intSerializer, strDeserializer, intDeserializer, topic1);
+                    (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1);
             KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
                     new Predicate<String, Integer>() {
                         @Override
@@ -112,7 +106,7 @@ public class KTableFilterTest {
             KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
             KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
 
             KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
             KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
@@ -178,7 +172,7 @@ public class KTableFilterTest {
             String topic1 = "topic1";
 
             KTableImpl<String, Integer, Integer> table1 =
-                    (KTableImpl<String, Integer, Integer>) builder.table(strSerializer, intSerializer, strDeserializer, intDeserializer, topic1);
+                    (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1);
             KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
                     new Predicate<String, Integer>() {
                         @Override
@@ -193,7 +187,7 @@ public class KTableFilterTest {
             builder.addProcessor("proc1", proc1, table1.name);
             builder.addProcessor("proc2", proc2, table2.name);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
 
             driver.process(topic1, "A", 1);
             driver.process(topic1, "B", 1);
@@ -233,7 +227,7 @@ public class KTableFilterTest {
             String topic1 = "topic1";
 
             KTableImpl<String, Integer, Integer> table1 =
-                    (KTableImpl<String, Integer, Integer>) builder.table(strSerializer, intSerializer, strDeserializer, intDeserializer, topic1);
+                    (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1);
             KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
                     new Predicate<String, Integer>() {
                         @Override
@@ -250,7 +244,7 @@ public class KTableFilterTest {
             builder.addProcessor("proc1", proc1, table1.name);
             builder.addProcessor("proc2", proc2, table2.name);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
 
             driver.process(topic1, "A", 1);
             driver.process(topic1, "B", 1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 2317c97..20c3a28 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
@@ -40,16 +38,16 @@ import static org.junit.Assert.assertNull;
 
 public class KTableImplTest {
 
+    final private Serde<String> stringSerde = new Serdes.StringSerde();
+
     @Test
     public void testKTable() {
-        final Serializer<String> serializer = new StringSerializer();
-        final Deserializer<String> deserializer = new StringDeserializer();
         final KStreamBuilder builder = new KStreamBuilder();
 
         String topic1 = "topic1";
         String topic2 = "topic2";
 
-        KTable<String, String> table1 = builder.table(serializer, serializer, deserializer, deserializer, topic1);
+        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
 
         MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
         table1.toStream().process(proc1);
@@ -74,7 +72,7 @@ public class KTableImplTest {
         MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>();
         table3.toStream().process(proc3);
 
-        KTable<String, String> table4 = table1.through(topic2, serializer, serializer, deserializer, deserializer);
+        KTable<String, String> table4 = table1.through(topic2, stringSerde, stringSerde);
 
         MockProcessorSupplier<String, String> proc4 = new MockProcessorSupplier<>();
         table4.toStream().process(proc4);
@@ -96,15 +94,13 @@ public class KTableImplTest {
     public void testValueGetter() throws IOException {
         File stateDir = Files.createTempDirectory("test").toFile();
         try {
-            final Serializer<String> serializer = new StringSerializer();
-            final Deserializer<String> deserializer = new StringDeserializer();
             final KStreamBuilder builder = new KStreamBuilder();
 
             String topic1 = "topic1";
             String topic2 = "topic2";
 
             KTableImpl<String, String, String> table1 =
-                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+                    (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
             KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
                     new ValueMapper<String, Integer>() {
                         @Override
@@ -120,14 +116,14 @@ public class KTableImplTest {
                         }
                     });
             KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
-                    table1.through(topic2, serializer, serializer, deserializer, deserializer);
+                    table1.through(topic2, stringSerde, stringSerde);
 
             KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
             KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
             KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
             KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
 
             // two state store should be created
             assertEquals(2, driver.allStateStores().size());
@@ -223,9 +219,6 @@ public class KTableImplTest {
 
     @Test
     public void testStateStore() throws IOException {
-        final Serializer<String> serializer = new StringSerializer();
-        final Deserializer<String> deserializer = new StringDeserializer();
-
         String topic1 = "topic1";
         String topic2 = "topic2";
 
@@ -234,9 +227,9 @@ public class KTableImplTest {
             KStreamBuilder builder = new KStreamBuilder();
 
             KTableImpl<String, String, String> table1 =
-                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+                    (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
             KTableImpl<String, String, String> table2 =
-                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic2);
+                    (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2);
 
             KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
                     new ValueMapper<String, Integer>() {
@@ -253,7 +246,7 @@ public class KTableImplTest {
                         }
                     });
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
             driver.setTime(0L);
 
             // no state store should be created
@@ -267,9 +260,9 @@ public class KTableImplTest {
             KStreamBuilder builder = new KStreamBuilder();
 
             KTableImpl<String, String, String> table1 =
-                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+                    (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
             KTableImpl<String, String, String> table2 =
-                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic2);
+                    (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2);
 
             KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
                     new ValueMapper<String, Integer>() {
@@ -293,7 +286,7 @@ public class KTableImplTest {
                         }
                     });
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
             driver.setTime(0L);
 
             // two state store should be created

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index 12bfb9c..5f30574 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
@@ -47,10 +45,8 @@ public class KTableKTableJoinTest {
     private String topic1 = "topic1";
     private String topic2 = "topic2";
 
-    private IntegerSerializer keySerializer = new IntegerSerializer();
-    private StringSerializer valSerializer = new StringSerializer();
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private StringDeserializer valDeserializer = new StringDeserializer();
+    final private Serde<Integer> intSerde = new Serdes.IntegerSerde();
+    final private Serde<String> stringSerde = new Serdes.StringSerde();
 
     private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
         @Override
@@ -80,8 +76,8 @@ public class KTableKTableJoinTest {
             MockProcessorSupplier<Integer, String> processor;
 
             processor = new MockProcessorSupplier<>();
-            table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
-            table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+            table1 = builder.table(intSerde, stringSerde, topic1);
+            table2 = builder.table(intSerde, stringSerde, topic2);
             joined = table1.join(table2, joiner);
             joined.toStream().process(processor);
 
@@ -179,8 +175,8 @@ public class KTableKTableJoinTest {
             KTable<Integer, String> joined;
             MockProcessorSupplier<Integer, String> proc;
 
-            table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
-            table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+            table1 = builder.table(intSerde, stringSerde, topic1);
+            table2 = builder.table(intSerde, stringSerde, topic2);
             joined = table1.join(table2, joiner);
 
             proc = new MockProcessorSupplier<>();
@@ -267,8 +263,8 @@ public class KTableKTableJoinTest {
             KTable<Integer, String> joined;
             MockProcessorSupplier<Integer, String> proc;
 
-            table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
-            table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+            table1 = builder.table(intSerde, stringSerde, topic1);
+            table2 = builder.table(intSerde, stringSerde, topic2);
             joined = table1.join(table2, joiner);
 
             ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index e3cf22b..f92c5ca 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
@@ -48,10 +46,8 @@ public class KTableKTableLeftJoinTest {
     private String topic1 = "topic1";
     private String topic2 = "topic2";
 
-    private IntegerSerializer keySerializer = new IntegerSerializer();
-    private StringSerializer valSerializer = new StringSerializer();
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private StringDeserializer valDeserializer = new StringDeserializer();
+    final private Serde<Integer> intSerde = new Serdes.IntegerSerde();
+    final private Serde<String> stringSerde = new Serdes.StringSerde();
 
     private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
         @Override
@@ -83,15 +79,11 @@ public class KTableKTableLeftJoinTest {
 
             final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-            KTable<Integer, String> table1;
-            KTable<Integer, String> table2;
-            KTable<Integer, String> joined;
+            KTable<Integer, String> table1 = builder.table(intSerde, stringSerde, topic1);
+            KTable<Integer, String> table2 = builder.table(intSerde, stringSerde, topic2);
+            KTable<Integer, String> joined = table1.leftJoin(table2, joiner);
             MockProcessorSupplier<Integer, String> processor;
-
             processor = new MockProcessorSupplier<>();
-            table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
-            table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
-            joined = table1.leftJoin(table2, joiner);
             joined.toStream().process(processor);
 
             Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -188,8 +180,8 @@ public class KTableKTableLeftJoinTest {
             KTable<Integer, String> joined;
             MockProcessorSupplier<Integer, String> proc;
 
-            table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
-            table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+            table1 = builder.table(intSerde, stringSerde, topic1);
+            table2 = builder.table(intSerde, stringSerde, topic2);
             joined = table1.leftJoin(table2, joiner);
 
             proc = new MockProcessorSupplier<>();
@@ -276,8 +268,8 @@ public class KTableKTableLeftJoinTest {
             KTable<Integer, String> joined;
             MockProcessorSupplier<Integer, String> proc;
 
-            table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
-            table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+            table1 = builder.table(intSerde, stringSerde, topic1);
+            table2 = builder.table(intSerde, stringSerde, topic2);
             joined = table1.leftJoin(table2, joiner);
 
             ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index feabc08..6cc77e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
@@ -47,10 +45,8 @@ public class KTableKTableOuterJoinTest {
     private String topic1 = "topic1";
     private String topic2 = "topic2";
 
-    private IntegerSerializer keySerializer = new IntegerSerializer();
-    private StringSerializer valSerializer = new StringSerializer();
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private StringDeserializer valDeserializer = new StringDeserializer();
+    final private Serde<Integer> intSerde = new Serdes.IntegerSerde();
+    final private Serde<String> stringSerde = new Serdes.StringSerde();
 
     private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
         @Override
@@ -80,8 +76,8 @@ public class KTableKTableOuterJoinTest {
             MockProcessorSupplier<Integer, String> processor;
 
             processor = new MockProcessorSupplier<>();
-            table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
-            table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+            table1 = builder.table(intSerde, stringSerde, topic1);
+            table2 = builder.table(intSerde, stringSerde, topic2);
             joined = table1.outerJoin(table2, joiner);
             joined.toStream().process(processor);
 
@@ -188,8 +184,8 @@ public class KTableKTableOuterJoinTest {
             KTable<Integer, String> joined;
             MockProcessorSupplier<Integer, String> proc;
 
-            table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
-            table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+            table1 = builder.table(intSerde, stringSerde, topic1);
+            table2 = builder.table(intSerde, stringSerde, topic2);
             joined = table1.outerJoin(table2, joiner);
 
             proc = new MockProcessorSupplier<>();
@@ -284,8 +280,8 @@ public class KTableKTableOuterJoinTest {
             KTable<Integer, String> joined;
             MockProcessorSupplier<Integer, String> proc;
 
-            table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
-            table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+            table1 = builder.table(intSerde, stringSerde, topic1);
+            table2 = builder.table(intSerde, stringSerde, topic2);
             joined = table1.outerJoin(table2, joiner);
 
             ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 58f1c2a..aa3daeb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
@@ -41,8 +39,7 @@ import static org.junit.Assert.assertTrue;
 
 public class KTableMapValuesTest {
 
-    private final Serializer<String> strSerializer = new StringSerializer();
-    private final Deserializer<String> strDeserializer = new StringDeserializer();
+    final private Serde<String> stringSerde = new Serdes.StringSerde();
 
     @Test
     public void testKTable() {
@@ -50,7 +47,7 @@ public class KTableMapValuesTest {
 
         String topic1 = "topic1";
 
-        KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
         KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() {
             @Override
             public Integer apply(String value) {
@@ -75,15 +72,13 @@ public class KTableMapValuesTest {
     public void testValueGetter() throws IOException {
         File stateDir = Files.createTempDirectory("test").toFile();
         try {
-            final Serializer<String> serializer = new StringSerializer();
-            final Deserializer<String> deserializer = new StringDeserializer();
             final KStreamBuilder builder = new KStreamBuilder();
 
             String topic1 = "topic1";
             String topic2 = "topic2";
 
             KTableImpl<String, String, String> table1 =
-                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+                    (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
             KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
                     new ValueMapper<String, Integer>() {
                         @Override
@@ -99,14 +94,14 @@ public class KTableMapValuesTest {
                         }
                     });
             KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
-                    table1.through(topic2, serializer, serializer, deserializer, deserializer);
+                    table1.through(topic2, stringSerde, stringSerde);
 
             KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
             KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
             KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
             KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
 
             KTableValueGetter<String, String> getter1 = getterSupplier1.get();
             getter1.init(driver.context());
@@ -201,14 +196,12 @@ public class KTableMapValuesTest {
     public void testNotSendingOldValue() throws IOException {
         File stateDir = Files.createTempDirectory("test").toFile();
         try {
-            final Serializer<String> serializer = new StringSerializer();
-            final Deserializer<String> deserializer = new StringDeserializer();
             final KStreamBuilder builder = new KStreamBuilder();
 
             String topic1 = "topic1";
 
             KTableImpl<String, String, String> table1 =
-                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+                    (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
             KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
                     new ValueMapper<String, Integer>() {
                         @Override
@@ -221,7 +214,7 @@ public class KTableMapValuesTest {
 
             builder.addProcessor("proc", proc, table2.name);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
 
             assertFalse(table1.sendingOldValueEnabled());
             assertFalse(table2.sendingOldValueEnabled());
@@ -254,14 +247,12 @@ public class KTableMapValuesTest {
     public void testSendingOldValue() throws IOException {
         File stateDir = Files.createTempDirectory("test").toFile();
         try {
-            final Serializer<String> serializer = new StringSerializer();
-            final Deserializer<String> deserializer = new StringDeserializer();
             final KStreamBuilder builder = new KStreamBuilder();
 
             String topic1 = "topic1";
 
             KTableImpl<String, String, String> table1 =
-                    (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+                    (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
             KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
                     new ValueMapper<String, Integer>() {
                         @Override
@@ -276,7 +267,7 @@ public class KTableMapValuesTest {
 
             builder.addProcessor("proc", proc, table2.name);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
 
             assertTrue(table1.sendingOldValueEnabled());
             assertTrue(table2.sendingOldValueEnabled());

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 187a6f2..51276f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
@@ -38,8 +36,7 @@ import static org.junit.Assert.assertTrue;
 
 public class KTableSourceTest {
 
-    private final Serializer<String> strSerializer = new StringSerializer();
-    private final Deserializer<String> strDeserializer = new StringDeserializer();
+    final private Serde<String> stringSerde = new Serdes.StringSerde();
 
     @Test
     public void testKTable() {
@@ -47,7 +44,7 @@ public class KTableSourceTest {
 
         String topic1 = "topic1";
 
-        KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
 
         MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
         table1.toStream().process(proc1);
@@ -72,12 +69,11 @@ public class KTableSourceTest {
 
             String topic1 = "topic1";
 
-            KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>)
-                    builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+            KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
 
             KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
 
             KTableValueGetter<String, String> getter1 = getterSupplier1.get();
             getter1.init(driver.context());
@@ -123,14 +119,13 @@ public class KTableSourceTest {
 
             String topic1 = "topic1";
 
-            KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>)
-                    builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+            KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
 
             MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
 
             builder.addProcessor("proc1", proc1, table1.name);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
 
             driver.process(topic1, "A", "01");
             driver.process(topic1, "B", "01");
@@ -165,8 +160,7 @@ public class KTableSourceTest {
 
             String topic1 = "topic1";
 
-            KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>)
-                    builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+            KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
 
             table1.enableSendingOldValues();
 
@@ -176,7 +170,7 @@ public class KTableSourceTest {
 
             builder.addProcessor("proc1", proc1, table1.name);
 
-            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+            KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
 
             driver.process(topic1, "A", "01");
             driver.process(topic1, "B", "01");

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
index 1b8cbb8..7c6d5ec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -37,8 +37,8 @@ public class WindowedStreamPartitionerTest {
 
     private String topicName = "topic";
 
-    private IntegerSerializer keySerializer = new IntegerSerializer();
-    private StringSerializer valSerializer = new StringSerializer();
+    private IntegerSerializer intSerializer = new IntegerSerializer();
+    private StringSerializer stringSerializer = new StringSerializer();
 
     private List<PartitionInfo> infos = Arrays.asList(
             new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]),
@@ -58,15 +58,15 @@ public class WindowedStreamPartitionerTest {
 
         DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
 
-        WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(keySerializer);
+        WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(intSerializer);
         WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(windowedSerializer);
 
         for (int k = 0; k < 10; k++) {
             Integer key = rand.nextInt();
-            byte[] keyBytes = keySerializer.serialize(topicName, key);
+            byte[] keyBytes = intSerializer.serialize(topicName, key);
 
             String value = key.toString();
-            byte[] valueBytes = valSerializer.serialize(topicName, value);
+            byte[] valueBytes = stringSerializer.serialize(topicName, value);
 
             Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 12210cc..ef08176 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertNull;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -70,11 +71,9 @@ public class ProcessorTopologyTest {
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "processor-topology-test");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
         props.setProperty(StreamsConfig.STATE_DIR_CONFIG, localState.getAbsolutePath());
+        props.setProperty(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        props.setProperty(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         props.setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
-        props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-        props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
-        props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-        props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
         this.config = new StreamsConfig(props);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 21bdaff..ea24441 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockStateStoreSupplier;
+import org.apache.kafka.test.MockTimestampExtractor;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -89,15 +90,11 @@ public class StandbyTaskTest {
     private StreamsConfig createConfig(final File baseDir) throws Exception {
         return new StreamsConfig(new Properties() {
             {
-                setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-                setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
                 setProperty(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
                 setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
+                setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
             }
         });
     }