You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/21 23:51:24 UTC
kafka git commit: KAFKA-3589: set inner serializer for ChangedSerde
upon initialization
Repository: kafka
Updated Branches:
refs/heads/trunk 5c547475d -> 74e6dc842
KAFKA-3589: set inner serializer for ChangedSerde upon initialization
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Eno Thereska <en...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #1246 from guozhangwang/K3589
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/74e6dc84
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/74e6dc84
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/74e6dc84
Branch: refs/heads/trunk
Commit: 74e6dc842559d344241c70cb6607a69291e3a20d
Parents: 5c54747
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Apr 21 14:51:21 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Apr 21 14:51:21 2016 -0700
----------------------------------------------------------------------
.../kstream/internals/ChangedDeserializer.java | 10 ++-
.../kstream/internals/ChangedSerializer.java | 10 ++-
.../kstream/internals/KGroupedTableImpl.java | 76 +++++++++++---------
.../streams/processor/internals/SinkNode.java | 14 ++++
.../streams/processor/internals/SourceNode.java | 18 ++++-
.../kstream/internals/KTableAggregateTest.java | 39 ++--------
.../kstream/internals/KTableImplTest.java | 49 +++++++++++++
.../apache/kafka/test/KStreamTestDriver.java | 26 ++++++-
.../org/apache/kafka/test/MockAggregator.java | 43 +++++++++++
.../org/apache/kafka/test/MockInitializer.java | 33 +++++++++
.../apache/kafka/test/MockKeyValueMapper.java | 36 ++++++++++
.../java/org/apache/kafka/test/MockReducer.java | 43 +++++++++++
.../apache/kafka/test/NoOpKeyValueMapper.java | 29 --------
13 files changed, 325 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
index d4c4e2d..ce9be49 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
@@ -26,12 +26,20 @@ public class ChangedDeserializer<T> implements Deserializer<Change<T>> {
private static final int NEWFLAG_SIZE = 1;
- private final Deserializer<T> inner;
+ private Deserializer<T> inner;
public ChangedDeserializer(Deserializer<T> inner) {
this.inner = inner;
}
+ public Deserializer<T> inner() {
+ return inner;
+ }
+
+ public void setInner(Deserializer<T> inner) {
+ this.inner = inner;
+ }
+
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// do nothing
http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index 5dbbac9..12e06f8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -27,12 +27,20 @@ public class ChangedSerializer<T> implements Serializer<Change<T>> {
private static final int NEWFLAG_SIZE = 1;
- private final Serializer<T> inner;
+ private Serializer<T> inner;
public ChangedSerializer(Serializer<T> inner) {
this.inner = inner;
}
+ public Serializer<T> inner() {
+ return inner;
+ }
+
+ public void setInner(Serializer<T> inner) {
+ this.inner = inner;
+ }
+
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// do nothing
http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index d9b0f3d..f2e2eed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -17,8 +17,10 @@
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedTable;
@@ -48,15 +50,12 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
protected final Serde<K> keySerde;
protected final Serde<V> valSerde;
- private final String sourceName;
-
public KGroupedTableImpl(KStreamBuilder topology,
String name,
String sourceName,
Serde<K> keySerde,
Serde<V> valSerde) {
super(topology, name, Collections.singleton(sourceName));
- this.sourceName = sourceName;
this.keySerde = keySerde;
this.valSerde = valSerde;
}
@@ -74,8 +73,13 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
String topic = name + REPARTITION_TOPIC_SUFFIX;
- ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valSerde.serializer());
- ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valSerde.deserializer());
+ Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
+ Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
+ Serializer<V> valueSerializer = valSerde == null ? null : valSerde.serializer();
+ Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
+
+ ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
+ ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(name, initializer, adder, subtractor);
@@ -87,10 +91,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
// send the aggregate key-value pairs to the intermediate topic for partitioning
topology.addInternalTopic(topic);
- topology.addSink(sinkName, topic, keySerde.serializer(), changedValueSerializer, this.name);
+ topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name);
// read the intermediate topic
- topology.addSource(sourceName, keySerde.deserializer(), changedValueDeserializer, topic);
+ topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic);
// aggregate the values with the aggregator and local store
topology.addProcessor(aggregateName, aggregateSupplier, sourceName);
@@ -110,29 +114,6 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
}
@Override
- public KTable<K, Long> count(String name) {
- return this.aggregate(
- new Initializer<Long>() {
- @Override
- public Long apply() {
- return 0L;
- }
- },
- new Aggregator<K, V, Long>() {
- @Override
- public Long apply(K aggKey, V value, Long aggregate) {
- return aggregate + 1L;
- }
- }, new Aggregator<K, V, Long>() {
- @Override
- public Long apply(K aggKey, V value, Long aggregate) {
- return aggregate - 1L;
- }
- },
- Serdes.Long(), name);
- }
-
- @Override
public KTable<K, V> reduce(Reducer<V> adder,
Reducer<V> subtractor,
String name) {
@@ -143,8 +124,13 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
String topic = name + REPARTITION_TOPIC_SUFFIX;
- ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valSerde.serializer());
- ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valSerde.deserializer());
+ Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer();
+ Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
+ Serializer<V> valueSerializer = valSerde == null ? null : valSerde.serializer();
+ Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
+
+ ChangedSerializer<V> changedValueSerializer = new ChangedSerializer<>(valueSerializer);
+ ChangedDeserializer<V> changedValueDeserializer = new ChangedDeserializer<>(valueDeserializer);
ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(name, adder, subtractor);
@@ -156,10 +142,10 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
// send the aggregate key-value pairs to the intermediate topic for partitioning
topology.addInternalTopic(topic);
- topology.addSink(sinkName, topic, keySerde.serializer(), changedValueSerializer, this.name);
+ topology.addSink(sinkName, topic, keySerializer, changedValueSerializer, this.name);
// read the intermediate topic
- topology.addSource(sourceName, keySerde.deserializer(), changedValueDeserializer, topic);
+ topology.addSource(sourceName, keyDeserializer, changedValueDeserializer, topic);
// aggregate the values with the aggregator and local store
topology.addProcessor(reduceName, aggregateSupplier, sourceName);
@@ -169,4 +155,26 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup
return new KTableImpl<>(topology, reduceName, aggregateSupplier, Collections.singleton(sourceName));
}
+ @Override
+ public KTable<K, Long> count(String name) {
+ return this.aggregate(
+ new Initializer<Long>() {
+ @Override
+ public Long apply() {
+ return 0L;
+ }
+ },
+ new Aggregator<K, V, Long>() {
+ @Override
+ public Long apply(K aggKey, V value, Long aggregate) {
+ return aggregate + 1L;
+ }
+ }, new Aggregator<K, V, Long>() {
+ @Override
+ public Long apply(K aggKey, V value, Long aggregate) {
+ return aggregate - 1L;
+ }
+ },
+ Serdes.Long(), name);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index e9c2760..3795916 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.ChangedSerializer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StreamPartitioner;
@@ -52,8 +53,16 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
@Override
public void init(ProcessorContext context) {
this.context = context;
+
+ // if serializers are null, get the default ones from the context
if (this.keySerializer == null) this.keySerializer = (Serializer<K>) context.keySerde().serializer();
if (this.valSerializer == null) this.valSerializer = (Serializer<V>) context.valueSerde().serializer();
+
+ // if value serializers are for {@code Change} values, set the inner serializer when necessary
+ if (this.valSerializer instanceof ChangedSerializer &&
+ ((ChangedSerializer) this.valSerializer).inner() == null)
+ ((ChangedSerializer) this.valSerializer).setInner(context.valueSerde().serializer());
+
}
@Override
@@ -67,4 +76,9 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> {
public void close() {
// do nothing
}
+
+ // for test only
+ public Serializer<V> valueSerializer() {
+ return valSerializer;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 1868c1b..a550344 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -18,6 +18,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.streams.kstream.internals.ChangedDeserializer;
import org.apache.kafka.streams.processor.ProcessorContext;
public class SourceNode<K, V> extends ProcessorNode<K, V> {
@@ -46,9 +47,16 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
public void init(ProcessorContext context) {
this.context = context;
- // if serializers are null, get the default ones from the context
- if (this.keyDeserializer == null) this.keyDeserializer = (Deserializer<K>) context.keySerde().deserializer();
- if (this.valDeserializer == null) this.valDeserializer = (Deserializer<V>) context.valueSerde().deserializer();
+ // if deserializers are null, get the default ones from the context
+ if (this.keyDeserializer == null)
+ this.keyDeserializer = (Deserializer<K>) context.keySerde().deserializer();
+ if (this.valDeserializer == null)
+ this.valDeserializer = (Deserializer<V>) context.valueSerde().deserializer();
+
+ // if value deserializers are for {@code Change} values, set the inner deserializer when necessary
+ if (this.valDeserializer instanceof ChangedDeserializer &&
+ ((ChangedDeserializer) this.valDeserializer).inner() == null)
+ ((ChangedDeserializer) this.valDeserializer).setInner(context.valueSerde().deserializer());
}
@Override
@@ -61,4 +69,8 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
// do nothing
}
+ // for test only
+ public Deserializer<V> valueDeserializer() {
+ return valDeserializer;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index 1564e95..be0ec19 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -20,13 +20,13 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Aggregator;
-import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.NoOpKeyValueMapper;
import org.junit.Test;
import java.io.File;
@@ -38,31 +38,6 @@ public class KTableAggregateTest {
final private Serde<String> stringSerde = new Serdes.StringSerde();
- private class StringAdd implements Aggregator<String, String, String> {
-
- @Override
- public String apply(String aggKey, String value, String aggregate) {
- return aggregate + "+" + value;
- }
- }
-
- private class StringRemove implements Aggregator<String, String, String> {
-
- @Override
- public String apply(String aggKey, String value, String aggregate) {
- return aggregate + "-" + value;
- }
- }
-
- private class StringInit implements Initializer<String> {
-
- @Override
- public String apply() {
- return "0";
- }
- }
-
-
@Test
public void testAggBasic() throws Exception {
final File baseDir = Files.createTempDirectory("test").toFile();
@@ -72,12 +47,12 @@ public class KTableAggregateTest {
String topic1 = "topic1";
KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1);
- KTable<String, String> table2 = table1.groupBy(new NoOpKeyValueMapper<String, String>(),
+ KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(),
stringSerde,
stringSerde
- ).aggregate(new StringInit(),
- new StringAdd(),
- new StringRemove(),
+ ).aggregate(MockInitializer.STRING_INIT,
+ MockAggregator.STRING_ADDER,
+ MockAggregator.STRING_REMOVER,
stringSerde,
"topic1-Canonized");
http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 6f49b6a..8a13e9a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -25,8 +25,14 @@ import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.internals.SinkNode;
+import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockAggregator;
+import org.apache.kafka.test.MockInitializer;
+import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockReducer;
import org.junit.Test;
import java.io.File;
@@ -34,7 +40,9 @@ import java.io.IOException;
import java.nio.file.Files;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
public class KTableImplTest {
@@ -295,6 +303,47 @@ public class KTableImplTest {
} finally {
Utils.delete(stateDir);
}
+ }
+
+ @Test
+ public void testRepartition() throws IOException {
+ String topic1 = "topic1";
+
+ File stateDir = Files.createTempDirectory("test").toFile();
+ try {
+ KStreamBuilder builder = new KStreamBuilder();
+
+ KTableImpl<String, String, String> table1 =
+ (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1);
+ KTableImpl<String, String, String> table1Aggregated = (KTableImpl<String, String, String>) table1
+ .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+ .aggregate(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER, MockAggregator.STRING_REMOVER, "mock-result1");
+
+
+ KTableImpl<String, String, String> table1Reduced = (KTableImpl<String, String, String>) table1
+ .groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
+ .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result2");
+
+ KStreamTestDriver driver = new KStreamTestDriver(builder, stateDir, stringSerde, stringSerde);
+ driver.setTime(0L);
+
+ // three state store should be created, one for source, one for aggregate and one for reduce
+ assertEquals(3, driver.allStateStores().size());
+
+ // contains the corresponding repartition source / sink nodes
+ assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000003"));
+ assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000004"));
+ assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000007"));
+ assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000008"));
+
+ assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000003")).valueSerializer()).inner());
+ assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000004")).valueDeserializer()).inner());
+ assertNotNull(((ChangedSerializer) ((SinkNode) driver.processor("KSTREAM-SINK-0000000007")).valueSerializer()).inner());
+ assertNotNull(((ChangedDeserializer) ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000008")).valueDeserializer()).inner());
+
+ } finally {
+ Utils.delete(stateDir);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 2ee8730..d738794 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -31,8 +31,10 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import java.io.File;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class KStreamTestDriver {
@@ -151,11 +153,33 @@ public class KStreamTestDriver {
}
}
+ public Set<String> allProcessorNames() {
+ Set<String> names = new HashSet<>();
+
+ List<ProcessorNode> nodes = topology.processors();
+
+ for (ProcessorNode node: nodes) {
+ names.add(node.name());
+ }
+
+ return names;
+ }
+
+ public ProcessorNode processor(String name) {
+ List<ProcessorNode> nodes = topology.processors();
+
+ for (ProcessorNode node: nodes) {
+ if (node.name().equals(name))
+ return node;
+ }
+
+ return null;
+ }
+
public Map<String, StateStore> allStateStores() {
return context.allStateStores();
}
-
private class MockRecordCollector extends RecordCollector {
public MockRecordCollector() {
super(null);
http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/test/MockAggregator.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockAggregator.java b/streams/src/test/java/org/apache/kafka/test/MockAggregator.java
new file mode 100644
index 0000000..e8bb10b
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockAggregator.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.kstream.Aggregator;
+
+public class MockAggregator {
+
+ private static class StringAdd implements Aggregator<String, String, String> {
+
+ @Override
+ public String apply(String aggKey, String value, String aggregate) {
+ return aggregate + "+" + value;
+ }
+ }
+
+ private static class StringRemove implements Aggregator<String, String, String> {
+
+ @Override
+ public String apply(String aggKey, String value, String aggregate) {
+ return aggregate + "-" + value;
+ }
+ }
+
+ public final static Aggregator<String, String, String> STRING_ADDER = new StringAdd();
+
+ public final static Aggregator<String, String, String> STRING_REMOVER = new StringRemove();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/test/MockInitializer.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockInitializer.java b/streams/src/test/java/org/apache/kafka/test/MockInitializer.java
new file mode 100644
index 0000000..9bfe7f8
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockInitializer.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.kstream.Initializer;
+
+public class MockInitializer {
+
+ private static class StringInit implements Initializer<String> {
+
+ @Override
+ public String apply() {
+ return "0";
+ }
+ }
+
+ public final static Initializer<String> STRING_INIT = new StringInit();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java b/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
new file mode 100644
index 0000000..ae8c2fd
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+
+public class MockKeyValueMapper {
+
+ private static class NoOpKeyValueMapper<K, V> implements KeyValueMapper<K, V, KeyValue<K, V>> {
+
+ @Override
+ public KeyValue<K, V> apply(K key, V value) {
+ return new KeyValue<>(key, value);
+ }
+ }
+
+ public static <K, V> KeyValueMapper<K, V, KeyValue<K, V>> NoOpKeyValueMapper() {
+ return new NoOpKeyValueMapper<>();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/test/MockReducer.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockReducer.java b/streams/src/test/java/org/apache/kafka/test/MockReducer.java
new file mode 100644
index 0000000..24a8fea
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockReducer.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.kstream.Reducer;
+
+public class MockReducer {
+
+ private static class StringAdd implements Reducer<String> {
+
+ @Override
+ public String apply(String value1, String value2) {
+ return value1 + "+" + value2;
+ }
+ }
+
+ private static class StringRemove implements Reducer<String> {
+
+ @Override
+ public String apply(String value1, String value2) {
+ return value1 + "-" + value2;
+ }
+ }
+
+ public final static Reducer<String> STRING_ADDER = new StringAdd();
+
+ public final static Reducer<String> STRING_REMOVER = new StringRemove();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/74e6dc84/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java b/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java
deleted file mode 100644
index 828b5ae..0000000
--- a/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.test;
-
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-
-public class NoOpKeyValueMapper<K, V> implements KeyValueMapper<K, V, KeyValue<K, V>> {
-
- @Override
- public KeyValue<K, V> apply(K key, V value) {
- return new KeyValue<>(key, value);
- }
-}