You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/03/17 23:42:04 UTC
[2/4] kafka git commit: KAFKA-3336: Unify Serializer and Deserializer
into Serialization
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
index 6823e6d..00089ab 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreSupplier.java
@@ -20,7 +20,7 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreSupplier;
-import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.StateSerdes;
/**
* A {@link org.apache.kafka.streams.state.KeyValueStore} that stores all entries in a local RocksDB database.
@@ -36,10 +36,10 @@ public class RocksDBWindowStoreSupplier<K, V> implements StateStoreSupplier {
private final long retentionPeriod;
private final boolean retainDuplicates;
private final int numSegments;
- private final Serdes<K, V> serdes;
+ private final StateSerdes<K, V> serdes;
private final Time time;
- public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, Serdes<K, V> serdes, Time time) {
+ public RocksDBWindowStoreSupplier(String name, long retentionPeriod, int numSegments, boolean retainDuplicates, StateSerdes<K, V> serdes, Time time) {
this.name = name;
this.retentionPeriod = retentionPeriod;
this.retainDuplicates = retainDuplicates;
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index 4229f94..a439117 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.state.Serdes;
+import org.apache.kafka.streams.state.StateSerdes;
import java.util.HashSet;
import java.util.Set;
@@ -36,7 +36,7 @@ public class StoreChangeLogger<K, V> {
// TODO: these values should be configurable
protected static final int DEFAULT_WRITE_BATCH_SIZE = 100;
- protected final Serdes<K, V> serialization;
+ protected final StateSerdes<K, V> serialization;
private final String topic;
private final int partition;
@@ -47,16 +47,16 @@ public class StoreChangeLogger<K, V> {
protected Set<K> dirty;
protected Set<K> removed;
- public StoreChangeLogger(String storeName, ProcessorContext context, Serdes<K, V> serialization) {
+ public StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization) {
this(storeName, context, serialization, DEFAULT_WRITE_BATCH_SIZE, DEFAULT_WRITE_BATCH_SIZE);
}
- public StoreChangeLogger(String storeName, ProcessorContext context, Serdes<K, V> serialization, int maxDirty, int maxRemoved) {
+ public StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization, int maxDirty, int maxRemoved) {
this(storeName, context, context.taskId().partition, serialization, maxDirty, maxRemoved);
init();
}
- protected StoreChangeLogger(String storeName, ProcessorContext context, int partition, Serdes<K, V> serialization, int maxDirty, int maxRemoved) {
+ protected StoreChangeLogger(String storeName, ProcessorContext context, int partition, StateSerdes<K, V> serialization, int maxDirty, int maxRemoved) {
this.topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
this.context = context;
this.partition = partition;
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 83ebe48..0dacde7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -18,11 +18,6 @@
package org.apache.kafka.streams;
import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.junit.Before;
import org.junit.Test;
@@ -43,11 +38,6 @@ public class StreamsConfigTest {
public void setUp() {
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-config-test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
- props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
- props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
- props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
streamsConfig = new StreamsConfig(props);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index 88366fa..e04a273 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -17,8 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.Predicate;
@@ -34,9 +33,6 @@ public class KStreamBranchTest {
private String topicName = "topic";
- private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
- private StringDeserializer valDeserializer = new StringDeserializer();
-
@SuppressWarnings("unchecked")
@Test
public void testKStreamBranch() {
@@ -67,7 +63,7 @@ public class KStreamBranchTest {
KStream<Integer, String>[] branches;
MockProcessorSupplier<Integer, String>[] processors;
- stream = builder.stream(keyDeserializer, valDeserializer, topicName);
+ stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
branches = stream.branch(isEven, isMultipleOfThree, isOdd);
assertEquals(3, branches.length);
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index 3bad041..ecf1115 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -17,8 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.Predicate;
@@ -33,9 +32,6 @@ public class KStreamFilterTest {
private String topicName = "topic";
- private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
- private StringDeserializer valDeserializer = new StringDeserializer();
-
private Predicate<Integer, String> isMultipleOfThree = new Predicate<Integer, String>() {
@Override
public boolean test(Integer key, String value) {
@@ -52,7 +48,7 @@ public class KStreamFilterTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
- stream = builder.stream(keyDeserializer, valDeserializer, topicName);
+ stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
stream.filter(isMultipleOfThree).process(processor);
KStreamTestDriver driver = new KStreamTestDriver(builder);
@@ -72,7 +68,7 @@ public class KStreamFilterTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
- stream = builder.stream(keyDeserializer, valDeserializer, topicName);
+ stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
stream.filterOut(isMultipleOfThree).process(processor);
KStreamTestDriver driver = new KStreamTestDriver(builder);
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index 693f58e..bc85757 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -17,8 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.KeyValue;
@@ -35,9 +34,6 @@ public class KStreamFlatMapTest {
private String topicName = "topic";
- private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
- private StringDeserializer valDeserializer = new StringDeserializer();
-
@Test
public void testFlatMap() {
KStreamBuilder builder = new KStreamBuilder();
@@ -60,7 +56,7 @@ public class KStreamFlatMapTest {
MockProcessorSupplier<String, String> processor;
processor = new MockProcessorSupplier<>();
- stream = builder.stream(keyDeserializer, valDeserializer, topicName);
+ stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
stream.flatMap(mapper).process(processor);
KStreamTestDriver driver = new KStreamTestDriver(builder);
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index eef7933..a904cb1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -17,8 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueMapper;
@@ -34,9 +33,6 @@ public class KStreamFlatMapValuesTest {
private String topicName = "topic";
- private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
- private StringDeserializer valDeserializer = new StringDeserializer();
-
@Test
public void testFlatMapValues() {
KStreamBuilder builder = new KStreamBuilder();
@@ -58,7 +54,7 @@ public class KStreamFlatMapValuesTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
- stream = builder.stream(keyDeserializer, valDeserializer, topicName);
+ stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
stream.flatMapValues(mapper).process(processor);
KStreamTestDriver driver = new KStreamTestDriver(builder);
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 3d3a9e3..38182bc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -17,12 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -39,18 +35,16 @@ import static org.junit.Assert.assertEquals;
public class KStreamImplTest {
+ final private Serde<String> stringSerde = Serdes.String();
+ final private Serde<Integer> intSerde = Serdes.Integer();
+
@Test
public void testNumProcesses() {
- final Serializer<String> stringSerializer = new StringSerializer();
- final Deserializer<String> stringDeserializer = new StringDeserializer();
- final Serializer<Integer> integerSerializer = new IntegerSerializer();
- final Deserializer<Integer> integerDeserializer = new IntegerDeserializer();
-
final KStreamBuilder builder = new KStreamBuilder();
- KStream<String, String> source1 = builder.stream(stringDeserializer, stringDeserializer, "topic-1", "topic-2");
+ KStream<String, String> source1 = builder.stream(stringSerde, stringSerde, "topic-1", "topic-2");
- KStream<String, String> source2 = builder.stream(stringDeserializer, stringDeserializer, "topic-3", "topic-4");
+ KStream<String, String> source2 = builder.stream(stringSerde, stringSerde, "topic-3", "topic-4");
KStream<String, String> stream1 =
source1.filter(new Predicate<String, String>() {
@@ -114,14 +108,14 @@ public class KStreamImplTest {
public Integer apply(Integer value1, Integer value2) {
return value1 + value2;
}
- }, JoinWindows.of("join-0"), stringSerializer, integerSerializer, integerSerializer, stringDeserializer, integerDeserializer, integerDeserializer);
+ }, JoinWindows.of("join-0"), stringSerde, intSerde, intSerde);
KStream<String, Integer> stream5 = streams2[1].join(streams3[1], new ValueJoiner<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer value1, Integer value2) {
return value1 + value2;
}
- }, JoinWindows.of("join-1"), stringSerializer, integerSerializer, integerSerializer, stringDeserializer, integerDeserializer, integerDeserializer);
+ }, JoinWindows.of("join-1"), stringSerde, intSerde, intSerde);
stream4.to("topic-5");
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index e763fd2..d24ab15 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -17,10 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
@@ -44,10 +42,8 @@ public class KStreamKStreamJoinTest {
private String topic1 = "topic1";
private String topic2 = "topic2";
- private IntegerSerializer keySerializer = new IntegerSerializer();
- private StringSerializer valSerializer = new StringSerializer();
- private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
- private StringDeserializer valDeserializer = new StringDeserializer();
+ final private Serde<Integer> intSerde = Serdes.Integer();
+ final private Serde<String> stringSerde = Serdes.String();
private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
@Override
@@ -71,10 +67,9 @@ public class KStreamKStreamJoinTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
- stream1 = builder.stream(keyDeserializer, valDeserializer, topic1);
- stream2 = builder.stream(keyDeserializer, valDeserializer, topic2);
- joined = stream1.join(stream2, joiner, JoinWindows.of("test").within(100),
- keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer);
+ stream1 = builder.stream(intSerde, stringSerde, topic1);
+ stream2 = builder.stream(intSerde, stringSerde, topic2);
+ joined = stream1.join(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde);
joined.process(processor);
Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -177,10 +172,9 @@ public class KStreamKStreamJoinTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
- stream1 = builder.stream(keyDeserializer, valDeserializer, topic1);
- stream2 = builder.stream(keyDeserializer, valDeserializer, topic2);
- joined = stream1.outerJoin(stream2, joiner, JoinWindows.of("test").within(100),
- keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer);
+ stream1 = builder.stream(intSerde, stringSerde, topic1);
+ stream2 = builder.stream(intSerde, stringSerde, topic2);
+ joined = stream1.outerJoin(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde);
joined.process(processor);
Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -285,10 +279,9 @@ public class KStreamKStreamJoinTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
- stream1 = builder.stream(keyDeserializer, valDeserializer, topic1);
- stream2 = builder.stream(keyDeserializer, valDeserializer, topic2);
- joined = stream1.join(stream2, joiner, JoinWindows.of("test").within(100),
- keySerializer, valSerializer, valSerializer, keyDeserializer, valDeserializer, valDeserializer);
+ stream1 = builder.stream(intSerde, stringSerde, topic1);
+ stream2 = builder.stream(intSerde, stringSerde, topic2);
+ joined = stream1.join(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde, stringSerde);
joined.process(processor);
Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 8c6e43b..166e8ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -17,10 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
@@ -44,10 +42,8 @@ public class KStreamKStreamLeftJoinTest {
private String topic1 = "topic1";
private String topic2 = "topic2";
- private IntegerSerializer keySerializer = new IntegerSerializer();
- private StringSerializer valSerializer = new StringSerializer();
- private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
- private StringDeserializer valDeserializer = new StringDeserializer();
+ final private Serde<Integer> intSerde = Serdes.Integer();
+ final private Serde<String> stringSerde = Serdes.String();
private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
@Override
@@ -71,10 +67,9 @@ public class KStreamKStreamLeftJoinTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
- stream1 = builder.stream(keyDeserializer, valDeserializer, topic1);
- stream2 = builder.stream(keyDeserializer, valDeserializer, topic2);
- joined = stream1.leftJoin(stream2, joiner, JoinWindows.of("test").within(100),
- keySerializer, valSerializer, keyDeserializer, valDeserializer);
+ stream1 = builder.stream(intSerde, stringSerde, topic1);
+ stream2 = builder.stream(intSerde, stringSerde, topic2);
+ joined = stream1.leftJoin(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde);
joined.process(processor);
Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -157,10 +152,9 @@ public class KStreamKStreamLeftJoinTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
- stream1 = builder.stream(keyDeserializer, valDeserializer, topic1);
- stream2 = builder.stream(keyDeserializer, valDeserializer, topic2);
- joined = stream1.leftJoin(stream2, joiner, JoinWindows.of("test").within(100),
- keySerializer, valSerializer, keyDeserializer, valDeserializer);
+ stream1 = builder.stream(intSerde, stringSerde, topic1);
+ stream2 = builder.stream(intSerde, stringSerde, topic2);
+ joined = stream1.leftJoin(stream2, joiner, JoinWindows.of("test").within(100), intSerde, stringSerde);
joined.process(processor);
Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index f226cee..8e672a2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -18,10 +18,8 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -47,10 +45,8 @@ public class KStreamKTableLeftJoinTest {
private String topic1 = "topic1";
private String topic2 = "topic2";
- private IntegerSerializer keySerializer = new IntegerSerializer();
- private StringSerializer valSerializer = new StringSerializer();
- private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
- private StringDeserializer valDeserializer = new StringDeserializer();
+ final private Serde<Integer> intSerde = new Serdes.IntegerSerde();
+ final private Serde<String> stringSerde = new Serdes.StringSerde();
private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
@Override
@@ -81,8 +77,8 @@ public class KStreamKTableLeftJoinTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
- stream = builder.stream(keyDeserializer, valDeserializer, topic1);
- table = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+ stream = builder.stream(intSerde, stringSerde, topic1);
+ table = builder.table(intSerde, stringSerde, topic2);
stream.leftJoin(table, joiner).process(processor);
Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -162,8 +158,8 @@ public class KStreamKTableLeftJoinTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
- stream = builder.stream(keyDeserializer, valDeserializer, topic1).map(keyValueMapper);
- table = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+ stream = builder.stream(intSerde, stringSerde, topic1).map(keyValueMapper);
+ table = builder.table(intSerde, stringSerde, topic2);
stream.leftJoin(table, joiner).process(processor);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index 73c517b..68fa656 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -17,8 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.KeyValue;
@@ -33,8 +33,8 @@ public class KStreamMapTest {
private String topicName = "topic";
- private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
- private StringDeserializer valDeserializer = new StringDeserializer();
+ final private Serde<Integer> intSerde = Serdes.Integer();
+ final private Serde<String> stringSerde = Serdes.String();
@Test
public void testMap() {
@@ -50,11 +50,10 @@ public class KStreamMapTest {
final int[] expectedKeys = new int[]{0, 1, 2, 3};
- KStream<Integer, String> stream;
+ KStream<Integer, String> stream = builder.stream(intSerde, stringSerde, topicName);
MockProcessorSupplier<String, Integer> processor;
processor = new MockProcessorSupplier<>();
- stream = builder.stream(keyDeserializer, valDeserializer, topicName);
stream.map(mapper).process(processor);
KStreamTestDriver driver = new KStreamTestDriver(builder);
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index 68fd285..e671aab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -17,8 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueMapper;
@@ -32,8 +32,8 @@ public class KStreamMapValuesTest {
private String topicName = "topic";
- private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
- private StringDeserializer valDeserializer = new StringDeserializer();
+ final private Serde<Integer> intSerde = Serdes.Integer();
+ final private Serde<String> stringSerde = Serdes.String();
@Test
public void testFlatMapValues() {
@@ -51,7 +51,7 @@ public class KStreamMapValuesTest {
KStream<Integer, String> stream;
MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
- stream = builder.stream(keyDeserializer, valDeserializer, topicName);
+ stream = builder.stream(intSerde, stringSerde, topicName);
stream.mapValues(mapper).process(processor);
KStreamTestDriver driver = new KStreamTestDriver(builder);
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index 426259f..4244de5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -17,7 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.KeyValue;
@@ -34,8 +35,7 @@ public class KStreamTransformTest {
private String topicName = "topic";
- private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
- private IntegerDeserializer valDeserializer = new IntegerDeserializer();
+ final private Serde<Integer> intSerde = Serdes.Integer();
@Test
public void testTransform() {
@@ -71,9 +71,8 @@ public class KStreamTransformTest {
final int[] expectedKeys = {1, 10, 100, 1000};
- KStream<Integer, Integer> stream;
MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
- stream = builder.stream(keyDeserializer, valDeserializer, topicName);
+ KStream<Integer, Integer> stream = builder.stream(intSerde, intSerde, topicName);
stream.transform(transformerSupplier).process(processor);
KStreamTestDriver driver = new KStreamTestDriver(builder);
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 7def9db..52abdf7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -17,7 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.ValueTransformer;
@@ -33,8 +34,7 @@ public class KStreamTransformValuesTest {
private String topicName = "topic";
- private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
- private IntegerDeserializer valDeserializer = new IntegerDeserializer();
+ final private Serde<Integer> intSerde = Serdes.Integer();
@Test
public void testTransform() {
@@ -72,7 +72,7 @@ public class KStreamTransformValuesTest {
KStream<Integer, Integer> stream;
MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
- stream = builder.stream(keyDeserializer, valDeserializer, topicName);
+ stream = builder.stream(intSerde, intSerde, topicName);
stream.transformValues(valueTransformerSupplier).process(processor);
KStreamTestDriver driver = new KStreamTestDriver(builder);
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 9e0745a..e19510f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -17,10 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.HoppingWindows;
@@ -41,8 +39,7 @@ import static org.junit.Assert.assertEquals;
public class KStreamWindowAggregateTest {
- private final Serializer<String> strSerializer = new StringSerializer();
- private final Deserializer<String> strDeserializer = new StringDeserializer();
+ final private Serde<String> strSerde = new Serdes.StringSerde();
private class StringAdd implements Aggregator<String, String, String> {
@@ -68,13 +65,11 @@ public class KStreamWindowAggregateTest {
final KStreamBuilder builder = new KStreamBuilder();
String topic1 = "topic1";
- KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1);
+ KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
KTable<Windowed<String>, String> table2 = stream1.aggregateByKey(new StringInit(), new StringAdd(),
HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
- strSerializer,
- strSerializer,
- strDeserializer,
- strDeserializer);
+ strSerde,
+ strSerde);
MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
table2.toStream().process(proc2);
@@ -147,24 +142,20 @@ public class KStreamWindowAggregateTest {
String topic1 = "topic1";
String topic2 = "topic2";
- KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1);
+ KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(new StringInit(), new StringAdd(),
HoppingWindows.of("topic1-Canonized").with(10L).every(5L),
- strSerializer,
- strSerializer,
- strDeserializer,
- strDeserializer);
+ strSerde,
+ strSerde);
MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
table1.toStream().process(proc1);
- KStream<String, String> stream2 = builder.stream(strDeserializer, strDeserializer, topic2);
+ KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2);
KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(new StringInit(), new StringAdd(),
HoppingWindows.of("topic2-Canonized").with(10L).every(5L),
- strSerializer,
- strSerializer,
- strDeserializer,
- strDeserializer);
+ strSerde,
+ strSerde);
MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
table2.toStream().process(proc2);
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index ec85ed7..fc01e5e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -17,10 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
@@ -38,8 +36,7 @@ import static org.junit.Assert.assertEquals;
public class KTableAggregateTest {
- private final Serializer<String> strSerializer = new StringSerializer();
- private final Deserializer<String> strDeserializer = new StringDeserializer();
+ final private Serde<String> stringSerde = new Serdes.StringSerde();
private class StringAdd implements Aggregator<String, String, String> {
@@ -74,15 +71,12 @@ public class KTableAggregateTest {
final KStreamBuilder builder = new KStreamBuilder();
String topic1 = "topic1";
- KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
- KTable<String, String> table2 = table1.<String, String, String>aggregate(new StringInit(), new StringAdd(), new StringRemove(),
+ KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
+ KTable<String, String> table2 = table1.aggregate(new StringInit(), new StringAdd(), new StringRemove(),
new NoOpKeyValueMapper<String, String>(),
- strSerializer,
- strSerializer,
- strSerializer,
- strDeserializer,
- strDeserializer,
- strDeserializer,
+ stringSerde,
+ stringSerde,
+ stringSerde,
"topic1-Canonized");
MockProcessorSupplier<String, String> proc2 = new MockProcessorSupplier<>();
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index c43bea0..5491ea3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -17,12 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
@@ -40,10 +36,8 @@ import static org.junit.Assert.assertNull;
public class KTableFilterTest {
- private final Serializer<String> strSerializer = new StringSerializer();
- private final Deserializer<String> strDeserializer = new StringDeserializer();
- private final Serializer<Integer> intSerializer = new IntegerSerializer();
- private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
+ final private Serde<Integer> intSerde = new Serdes.IntegerSerde();
+ final private Serde<String> stringSerde = new Serdes.StringSerde();
@Test
public void testKTable() {
@@ -51,7 +45,7 @@ public class KTableFilterTest {
String topic1 = "topic1";
- KTable<String, Integer> table1 = builder.table(strSerializer, intSerializer, strDeserializer, intDeserializer, topic1);
+ KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1);
KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() {
@Override
@@ -93,7 +87,7 @@ public class KTableFilterTest {
String topic1 = "topic1";
KTableImpl<String, Integer, Integer> table1 =
- (KTableImpl<String, Integer, Integer>) builder.table(strSerializer, intSerializer, strDeserializer, intDeserializer, topic1);
+ (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1);
KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
new Predicate<String, Integer>() {
@Override
@@ -112,7 +106,7 @@ public class KTableFilterTest {
KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+ KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
@@ -178,7 +172,7 @@ public class KTableFilterTest {
String topic1 = "topic1";
KTableImpl<String, Integer, Integer> table1 =
- (KTableImpl<String, Integer, Integer>) builder.table(strSerializer, intSerializer, strDeserializer, intDeserializer, topic1);
+ (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1);
KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
new Predicate<String, Integer>() {
@Override
@@ -193,7 +187,7 @@ public class KTableFilterTest {
builder.addProcessor("proc1", proc1, table1.name);
builder.addProcessor("proc2", proc2, table2.name);
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+ KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
driver.process(topic1, "A", 1);
driver.process(topic1, "B", 1);
@@ -233,7 +227,7 @@ public class KTableFilterTest {
String topic1 = "topic1";
KTableImpl<String, Integer, Integer> table1 =
- (KTableImpl<String, Integer, Integer>) builder.table(strSerializer, intSerializer, strDeserializer, intDeserializer, topic1);
+ (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1);
KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
new Predicate<String, Integer>() {
@Override
@@ -250,7 +244,7 @@ public class KTableFilterTest {
builder.addProcessor("proc1", proc1, table1.name);
builder.addProcessor("proc2", proc2, table2.name);
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+ KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
driver.process(topic1, "A", 1);
driver.process(topic1, "B", 1);
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 2317c97..20c3a28 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -17,10 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
@@ -40,16 +38,16 @@ import static org.junit.Assert.assertNull;
public class KTableImplTest {
+ final private Serde<String> stringSerde = new Serdes.StringSerde();
+
@Test
public void testKTable() {
- final Serializer<String> serializer = new StringSerializer();
- final Deserializer<String> deserializer = new StringDeserializer();
final KStreamBuilder builder = new KStreamBuilder();
String topic1 = "topic1";
String topic2 = "topic2";
- KTable<String, String> table1 = builder.table(serializer, serializer, deserializer, deserializer, topic1);
+ KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
table1.toStream().process(proc1);
@@ -74,7 +72,7 @@ public class KTableImplTest {
MockProcessorSupplier<String, Integer> proc3 = new MockProcessorSupplier<>();
table3.toStream().process(proc3);
- KTable<String, String> table4 = table1.through(topic2, serializer, serializer, deserializer, deserializer);
+ KTable<String, String> table4 = table1.through(topic2, stringSerde, stringSerde);
MockProcessorSupplier<String, String> proc4 = new MockProcessorSupplier<>();
table4.toStream().process(proc4);
@@ -96,15 +94,13 @@ public class KTableImplTest {
public void testValueGetter() throws IOException {
File stateDir = Files.createTempDirectory("test").toFile();
try {
- final Serializer<String> serializer = new StringSerializer();
- final Deserializer<String> deserializer = new StringDeserializer();
final KStreamBuilder builder = new KStreamBuilder();
String topic1 = "topic1";
String topic2 = "topic2";
KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+ (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
new ValueMapper<String, Integer>() {
@Override
@@ -120,14 +116,14 @@ public class KTableImplTest {
}
});
KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
- table1.through(topic2, serializer, serializer, deserializer, deserializer);
+ table1.through(topic2, stringSerde, stringSerde);
KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+ KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
// two state store should be created
assertEquals(2, driver.allStateStores().size());
@@ -223,9 +219,6 @@ public class KTableImplTest {
@Test
public void testStateStore() throws IOException {
- final Serializer<String> serializer = new StringSerializer();
- final Deserializer<String> deserializer = new StringDeserializer();
-
String topic1 = "topic1";
String topic2 = "topic2";
@@ -234,9 +227,9 @@ public class KTableImplTest {
KStreamBuilder builder = new KStreamBuilder();
KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+ (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
KTableImpl<String, String, String> table2 =
- (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic2);
+ (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2);
KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
new ValueMapper<String, Integer>() {
@@ -253,7 +246,7 @@ public class KTableImplTest {
}
});
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+ KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
driver.setTime(0L);
// no state store should be created
@@ -267,9 +260,9 @@ public class KTableImplTest {
KStreamBuilder builder = new KStreamBuilder();
KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+ (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
KTableImpl<String, String, String> table2 =
- (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic2);
+ (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2);
KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
new ValueMapper<String, Integer>() {
@@ -293,7 +286,7 @@ public class KTableImplTest {
}
});
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+ KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
driver.setTime(0L);
// two state store should be created
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index 12bfb9c..5f30574 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -17,10 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
@@ -47,10 +45,8 @@ public class KTableKTableJoinTest {
private String topic1 = "topic1";
private String topic2 = "topic2";
- private IntegerSerializer keySerializer = new IntegerSerializer();
- private StringSerializer valSerializer = new StringSerializer();
- private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
- private StringDeserializer valDeserializer = new StringDeserializer();
+ final private Serde<Integer> intSerde = new Serdes.IntegerSerde();
+ final private Serde<String> stringSerde = new Serdes.StringSerde();
private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
@Override
@@ -80,8 +76,8 @@ public class KTableKTableJoinTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
- table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
- table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+ table1 = builder.table(intSerde, stringSerde, topic1);
+ table2 = builder.table(intSerde, stringSerde, topic2);
joined = table1.join(table2, joiner);
joined.toStream().process(processor);
@@ -179,8 +175,8 @@ public class KTableKTableJoinTest {
KTable<Integer, String> joined;
MockProcessorSupplier<Integer, String> proc;
- table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
- table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+ table1 = builder.table(intSerde, stringSerde, topic1);
+ table2 = builder.table(intSerde, stringSerde, topic2);
joined = table1.join(table2, joiner);
proc = new MockProcessorSupplier<>();
@@ -267,8 +263,8 @@ public class KTableKTableJoinTest {
KTable<Integer, String> joined;
MockProcessorSupplier<Integer, String> proc;
- table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
- table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+ table1 = builder.table(intSerde, stringSerde, topic1);
+ table2 = builder.table(intSerde, stringSerde, topic2);
joined = table1.join(table2, joiner);
((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index e3cf22b..f92c5ca 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -17,10 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
@@ -48,10 +46,8 @@ public class KTableKTableLeftJoinTest {
private String topic1 = "topic1";
private String topic2 = "topic2";
- private IntegerSerializer keySerializer = new IntegerSerializer();
- private StringSerializer valSerializer = new StringSerializer();
- private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
- private StringDeserializer valDeserializer = new StringDeserializer();
+ final private Serde<Integer> intSerde = new Serdes.IntegerSerde();
+ final private Serde<String> stringSerde = new Serdes.StringSerde();
private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
@Override
@@ -83,15 +79,11 @@ public class KTableKTableLeftJoinTest {
final int[] expectedKeys = new int[]{0, 1, 2, 3};
- KTable<Integer, String> table1;
- KTable<Integer, String> table2;
- KTable<Integer, String> joined;
+ KTable<Integer, String> table1 = builder.table(intSerde, stringSerde, topic1);
+ KTable<Integer, String> table2 = builder.table(intSerde, stringSerde, topic2);
+ KTable<Integer, String> joined = table1.leftJoin(table2, joiner);
MockProcessorSupplier<Integer, String> processor;
-
processor = new MockProcessorSupplier<>();
- table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
- table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
- joined = table1.leftJoin(table2, joiner);
joined.toStream().process(processor);
Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -188,8 +180,8 @@ public class KTableKTableLeftJoinTest {
KTable<Integer, String> joined;
MockProcessorSupplier<Integer, String> proc;
- table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
- table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+ table1 = builder.table(intSerde, stringSerde, topic1);
+ table2 = builder.table(intSerde, stringSerde, topic2);
joined = table1.leftJoin(table2, joiner);
proc = new MockProcessorSupplier<>();
@@ -276,8 +268,8 @@ public class KTableKTableLeftJoinTest {
KTable<Integer, String> joined;
MockProcessorSupplier<Integer, String> proc;
- table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
- table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+ table1 = builder.table(intSerde, stringSerde, topic1);
+ table2 = builder.table(intSerde, stringSerde, topic2);
joined = table1.leftJoin(table2, joiner);
((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index feabc08..6cc77e0 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -17,10 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
@@ -47,10 +45,8 @@ public class KTableKTableOuterJoinTest {
private String topic1 = "topic1";
private String topic2 = "topic2";
- private IntegerSerializer keySerializer = new IntegerSerializer();
- private StringSerializer valSerializer = new StringSerializer();
- private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
- private StringDeserializer valDeserializer = new StringDeserializer();
+ final private Serde<Integer> intSerde = new Serdes.IntegerSerde();
+ final private Serde<String> stringSerde = new Serdes.StringSerde();
private ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
@Override
@@ -80,8 +76,8 @@ public class KTableKTableOuterJoinTest {
MockProcessorSupplier<Integer, String> processor;
processor = new MockProcessorSupplier<>();
- table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
- table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+ table1 = builder.table(intSerde, stringSerde, topic1);
+ table2 = builder.table(intSerde, stringSerde, topic2);
joined = table1.outerJoin(table2, joiner);
joined.toStream().process(processor);
@@ -188,8 +184,8 @@ public class KTableKTableOuterJoinTest {
KTable<Integer, String> joined;
MockProcessorSupplier<Integer, String> proc;
- table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
- table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+ table1 = builder.table(intSerde, stringSerde, topic1);
+ table2 = builder.table(intSerde, stringSerde, topic2);
joined = table1.outerJoin(table2, joiner);
proc = new MockProcessorSupplier<>();
@@ -284,8 +280,8 @@ public class KTableKTableOuterJoinTest {
KTable<Integer, String> joined;
MockProcessorSupplier<Integer, String> proc;
- table1 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic1);
- table2 = builder.table(keySerializer, valSerializer, keyDeserializer, valDeserializer, topic2);
+ table1 = builder.table(intSerde, stringSerde, topic1);
+ table2 = builder.table(intSerde, stringSerde, topic2);
joined = table1.outerJoin(table2, joiner);
((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 58f1c2a..aa3daeb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -17,10 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
@@ -41,8 +39,7 @@ import static org.junit.Assert.assertTrue;
public class KTableMapValuesTest {
- private final Serializer<String> strSerializer = new StringSerializer();
- private final Deserializer<String> strDeserializer = new StringDeserializer();
+ final private Serde<String> stringSerde = new Serdes.StringSerde();
@Test
public void testKTable() {
@@ -50,7 +47,7 @@ public class KTableMapValuesTest {
String topic1 = "topic1";
- KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+ KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<String, Integer>() {
@Override
public Integer apply(String value) {
@@ -75,15 +72,13 @@ public class KTableMapValuesTest {
public void testValueGetter() throws IOException {
File stateDir = Files.createTempDirectory("test").toFile();
try {
- final Serializer<String> serializer = new StringSerializer();
- final Deserializer<String> deserializer = new StringDeserializer();
final KStreamBuilder builder = new KStreamBuilder();
String topic1 = "topic1";
String topic2 = "topic2";
KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+ (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
new ValueMapper<String, Integer>() {
@Override
@@ -99,14 +94,14 @@ public class KTableMapValuesTest {
}
});
KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>)
- table1.through(topic2, serializer, serializer, deserializer, deserializer);
+ table1.through(topic2, stringSerde, stringSerde);
KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+ KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
KTableValueGetter<String, String> getter1 = getterSupplier1.get();
getter1.init(driver.context());
@@ -201,14 +196,12 @@ public class KTableMapValuesTest {
public void testNotSendingOldValue() throws IOException {
File stateDir = Files.createTempDirectory("test").toFile();
try {
- final Serializer<String> serializer = new StringSerializer();
- final Deserializer<String> deserializer = new StringDeserializer();
final KStreamBuilder builder = new KStreamBuilder();
String topic1 = "topic1";
KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+ (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
new ValueMapper<String, Integer>() {
@Override
@@ -221,7 +214,7 @@ public class KTableMapValuesTest {
builder.addProcessor("proc", proc, table2.name);
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+ KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
assertFalse(table1.sendingOldValueEnabled());
assertFalse(table2.sendingOldValueEnabled());
@@ -254,14 +247,12 @@ public class KTableMapValuesTest {
public void testSendingOldValue() throws IOException {
File stateDir = Files.createTempDirectory("test").toFile();
try {
- final Serializer<String> serializer = new StringSerializer();
- final Deserializer<String> deserializer = new StringDeserializer();
final KStreamBuilder builder = new KStreamBuilder();
String topic1 = "topic1";
KTableImpl<String, String, String> table1 =
- (KTableImpl<String, String, String>) builder.table(serializer, serializer, deserializer, deserializer, topic1);
+ (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
new ValueMapper<String, Integer>() {
@Override
@@ -276,7 +267,7 @@ public class KTableMapValuesTest {
builder.addProcessor("proc", proc, table2.name);
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+ KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
assertTrue(table1.sendingOldValueEnabled());
assertTrue(table2.sendingOldValueEnabled());
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 187a6f2..51276f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -17,10 +17,8 @@
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
@@ -38,8 +36,7 @@ import static org.junit.Assert.assertTrue;
public class KTableSourceTest {
- private final Serializer<String> strSerializer = new StringSerializer();
- private final Deserializer<String> strDeserializer = new StringDeserializer();
+ final private Serde<String> stringSerde = new Serdes.StringSerde();
@Test
public void testKTable() {
@@ -47,7 +44,7 @@ public class KTableSourceTest {
String topic1 = "topic1";
- KTable<String, String> table1 = builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+ KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
table1.toStream().process(proc1);
@@ -72,12 +69,11 @@ public class KTableSourceTest {
String topic1 = "topic1";
- KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>)
- builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+ KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+ KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
KTableValueGetter<String, String> getter1 = getterSupplier1.get();
getter1.init(driver.context());
@@ -123,14 +119,13 @@ public class KTableSourceTest {
String topic1 = "topic1";
- KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>)
- builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+ KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
builder.addProcessor("proc1", proc1, table1.name);
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+ KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
driver.process(topic1, "A", "01");
driver.process(topic1, "B", "01");
@@ -165,8 +160,7 @@ public class KTableSourceTest {
String topic1 = "topic1";
- KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>)
- builder.table(strSerializer, strSerializer, strDeserializer, strDeserializer, topic1);
+ KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
table1.enableSendingOldValues();
@@ -176,7 +170,7 @@ public class KTableSourceTest {
builder.addProcessor("proc1", proc1, table1.name);
- KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null, null, null);
+ KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, null, null);
driver.process(topic1, "A", "01");
driver.process(topic1, "B", "01");
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
index 1b8cbb8..7c6d5ec 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -37,8 +37,8 @@ public class WindowedStreamPartitionerTest {
private String topicName = "topic";
- private IntegerSerializer keySerializer = new IntegerSerializer();
- private StringSerializer valSerializer = new StringSerializer();
+ private IntegerSerializer intSerializer = new IntegerSerializer();
+ private StringSerializer stringSerializer = new StringSerializer();
private List<PartitionInfo> infos = Arrays.asList(
new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]),
@@ -58,15 +58,15 @@ public class WindowedStreamPartitionerTest {
DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
- WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(keySerializer);
+ WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(intSerializer);
WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(windowedSerializer);
for (int k = 0; k < 10; k++) {
Integer key = rand.nextInt();
- byte[] keyBytes = keySerializer.serialize(topicName, key);
+ byte[] keyBytes = intSerializer.serialize(topicName, key);
String value = key.toString();
- byte[] valueBytes = valSerializer.serialize(topicName, value);
+ byte[] valueBytes = stringSerializer.serialize(topicName, value);
Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster);
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 12210cc..ef08176 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertNull;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -70,11 +71,9 @@ public class ProcessorTopologyTest {
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "processor-topology-test");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, localState.getAbsolutePath());
+ props.setProperty(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+ props.setProperty(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
- props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
- props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
this.config = new StreamsConfig(props);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/dea0719e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 21bdaff..ea24441 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockStateStoreSupplier;
+import org.apache.kafka.test.MockTimestampExtractor;
import org.junit.Before;
import org.junit.Test;
@@ -89,15 +90,11 @@ public class StandbyTaskTest {
private StreamsConfig createConfig(final File baseDir) throws Exception {
return new StreamsConfig(new Properties() {
{
- setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
setProperty(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
+ setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
}
});
}