You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/06/13 17:28:17 UTC
[kafka] branch trunk updated: KAFKA-6474: Rewrite tests to use new
public TopologyTestDriver [part 2] (#4986)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new de4f4f5 KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 2] (#4986)
de4f4f5 is described below
commit de4f4f530a92473edb3665c94ac386d5c53cc893
Author: Filipe Agapito <fi...@gmail.com>
AuthorDate: Wed Jun 13 18:27:35 2018 +0100
KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 2] (#4986)
* KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 2]
* Refactor:
-KTableFilterTest.java
-KTableImplTest.java
-KTableMapValuesTest.java
-KTableSourceTest.java
* Add access to task, processorTopology, and globalTopology in TopologyTestDriver via TopologyTestDriverWrapper
* Remove unnecessary constructor in TopologyTestDriver
* Change how TopologyTestDriverWrapper#getProcessorContext sets the current node
Reviewers: John Roesler <jo...@confluent.io>, Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
.../kafka/streams/TopologyTestDriverWrapper.java | 70 +++++
.../kstream/internals/KTableFilterTest.java | 249 +++++++++--------
.../streams/kstream/internals/KTableImplTest.java | 256 ++++++++---------
.../kstream/internals/KTableMapValuesTest.java | 308 ++++++++++-----------
.../kstream/internals/KTableSourceTest.java | 190 +++++++------
.../apache/kafka/streams/TopologyTestDriver.java | 7 +-
6 files changed, 576 insertions(+), 504 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
new file mode 100644
index 0000000..bec4b5f
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTestDriverWrapper.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
+import org.apache.kafka.streams.processor.internals.ProcessorNode;
+
+import java.util.Properties;
+
+/**
+ * This class provides access to {@link TopologyTestDriver} protected methods.
+ * It should only be used for internal testing, in the rare occasions where the
+ * necessary functionality is not supported by {@link TopologyTestDriver}.
+ */
+public class TopologyTestDriverWrapper extends TopologyTestDriver {
+
+
+ public TopologyTestDriverWrapper(final Topology topology,
+ final Properties config) {
+ super(topology, config);
+ }
+
+ /**
+ * Get the processor context, setting the processor whose name is given as current node
+ *
+ * @param processorName processor name to set as current node
+ * @return the processor context
+ */
+ public ProcessorContext setCurrentNodeForProcessorContext(final String processorName) {
+ final ProcessorContext context = task.context();
+ ((ProcessorContextImpl) context).setCurrentNode(getProcessor(processorName));
+ return context;
+ }
+
+ /**
+ * Get a processor by name
+ *
+ * @param name the name to search for
+ * @return the processor matching the search name
+ */
+ public ProcessorNode getProcessor(final String name) {
+ for (final ProcessorNode node : processorTopology.processors()) {
+ if (node.name().equals(name)) {
+ return node;
+ }
+ }
+ for (final ProcessorNode node : globalTopology.processors()) {
+ if (node.name().equals(name)) {
+ return node;
+ }
+ }
+ throw new StreamsException("Could not find a processor named '" + name + "'");
+ }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index c37078d..2cf192b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -16,45 +16,40 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyTestDriverWrapper;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
-import org.apache.kafka.test.MockMapper;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Rule;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
-import java.io.File;
import java.util.List;
+import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class KTableFilterTest {
- final private Serde<Integer> intSerde = Serdes.Integer();
- final private Serde<String> stringSerde = Serdes.String();
- private final Consumed<String, Integer> consumed = Consumed.with(stringSerde, intSerde);
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
- private File stateDir = null;
-
- @Before
- public void setUp() {
- stateDir = TestUtils.tempDirectory("kafka-test");
- }
+ private final Consumed<String, Integer> consumed = Consumed.with(Serdes.String(), Serdes.Integer());
+ private final ConsumerRecordFactory<String, Integer> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer());
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.Integer());
private void doTestKTable(final StreamsBuilder builder,
final KTable<String, Integer> table2,
@@ -64,16 +59,14 @@ public class KTableFilterTest {
table2.toStream().process(supplier);
table3.toStream().process(supplier);
- driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
-
- driver.process(topic, "A", 1);
- driver.process(topic, "B", 2);
- driver.process(topic, "C", 3);
- driver.process(topic, "D", 4);
- driver.flushState();
- driver.process(topic, "A", null);
- driver.process(topic, "B", null);
- driver.flushState();
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ driver.pipeInput(recordFactory.create(topic, "A", 1));
+ driver.pipeInput(recordFactory.create(topic, "B", 2));
+ driver.pipeInput(recordFactory.create(topic, "C", 3));
+ driver.pipeInput(recordFactory.create(topic, "D", 4));
+ driver.pipeInput(recordFactory.create(topic, "A", null));
+ driver.pipeInput(recordFactory.create(topic, "B", null));
+ }
final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
@@ -136,63 +129,68 @@ public class KTableFilterTest {
final KTableImpl<String, Integer, Integer> table2,
final KTableImpl<String, Integer, Integer> table3,
final String topic1) {
+
+ final Topology topology = builder.build();
+
KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
- driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
+ final InternalTopologyBuilder topologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
+ topologyBuilder.connectProcessorAndStateStores(table2.name, getterSupplier2.storeNames());
+ topologyBuilder.connectProcessorAndStateStores(table3.name, getterSupplier3.storeNames());
- KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
- KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+ try (final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(topology, props)) {
- getter2.init(driver.context());
- getter3.init(driver.context());
+ KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+ KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
- driver.process(topic1, "A", 1);
- driver.process(topic1, "B", 1);
- driver.process(topic1, "C", 1);
+ getter2.init(driver.setCurrentNodeForProcessorContext(table2.name));
+ getter3.init(driver.setCurrentNodeForProcessorContext(table3.name));
- assertNull(getter2.get("A"));
- assertNull(getter2.get("B"));
- assertNull(getter2.get("C"));
+ driver.pipeInput(recordFactory.create(topic1, "A", 1));
+ driver.pipeInput(recordFactory.create(topic1, "B", 1));
+ driver.pipeInput(recordFactory.create(topic1, "C", 1));
- assertEquals(1, (int) getter3.get("A"));
- assertEquals(1, (int) getter3.get("B"));
- assertEquals(1, (int) getter3.get("C"));
+ assertNull(getter2.get("A"));
+ assertNull(getter2.get("B"));
+ assertNull(getter2.get("C"));
- driver.process(topic1, "A", 2);
- driver.process(topic1, "B", 2);
- driver.flushState();
+ assertEquals(1, (int) getter3.get("A"));
+ assertEquals(1, (int) getter3.get("B"));
+ assertEquals(1, (int) getter3.get("C"));
- assertEquals(2, (int) getter2.get("A"));
- assertEquals(2, (int) getter2.get("B"));
- assertNull(getter2.get("C"));
+ driver.pipeInput(recordFactory.create(topic1, "A", 2));
+ driver.pipeInput(recordFactory.create(topic1, "B", 2));
- assertNull(getter3.get("A"));
- assertNull(getter3.get("B"));
- assertEquals(1, (int) getter3.get("C"));
+ assertEquals(2, (int) getter2.get("A"));
+ assertEquals(2, (int) getter2.get("B"));
+ assertNull(getter2.get("C"));
- driver.process(topic1, "A", 3);
- driver.flushState();
+ assertNull(getter3.get("A"));
+ assertNull(getter3.get("B"));
+ assertEquals(1, (int) getter3.get("C"));
- assertNull(getter2.get("A"));
- assertEquals(2, (int) getter2.get("B"));
- assertNull(getter2.get("C"));
+ driver.pipeInput(recordFactory.create(topic1, "A", 3));
- assertEquals(3, (int) getter3.get("A"));
- assertNull(getter3.get("B"));
- assertEquals(1, (int) getter3.get("C"));
+ assertNull(getter2.get("A"));
+ assertEquals(2, (int) getter2.get("B"));
+ assertNull(getter2.get("C"));
- driver.process(topic1, "A", null);
- driver.process(topic1, "B", null);
- driver.flushState();
+ assertEquals(3, (int) getter3.get("A"));
+ assertNull(getter3.get("B"));
+ assertEquals(1, (int) getter3.get("C"));
- assertNull(getter2.get("A"));
- assertNull(getter2.get("B"));
- assertNull(getter2.get("C"));
+ driver.pipeInput(recordFactory.create(topic1, "A", null));
+ driver.pipeInput(recordFactory.create(topic1, "B", null));
- assertNull(getter3.get("A"));
- assertNull(getter3.get("B"));
- assertEquals(1, (int) getter3.get("C"));
+ assertNull(getter2.get("A"));
+ assertNull(getter2.get("B"));
+ assertNull(getter2.get("C"));
+
+ assertNull(getter3.get("A"));
+ assertNull(getter3.get("B"));
+ assertEquals(1, (int) getter3.get("C"));
+ }
}
@Test
@@ -259,34 +257,34 @@ public class KTableFilterTest {
builder.build().addProcessor("proc1", supplier, table1.name);
builder.build().addProcessor("proc2", supplier, table2.name);
- driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
- driver.process(topic1, "A", 1);
- driver.process(topic1, "B", 1);
- driver.process(topic1, "C", 1);
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic1, "A", 1));
+ driver.pipeInput(recordFactory.create(topic1, "B", 1));
+ driver.pipeInput(recordFactory.create(topic1, "C", 1));
- final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
+ final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
+
+ processors.get(0).checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+ processors.get(1).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
- processors.get(0).checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
- processors.get(1).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)", "C:(null<-null)");
-
- driver.process(topic1, "A", 2);
- driver.process(topic1, "B", 2);
- driver.flushState();
- processors.get(0).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
- processors.get(1).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
-
- driver.process(topic1, "A", 3);
- driver.flushState();
- processors.get(0).checkAndClearProcessResult("A:(3<-null)");
- processors.get(1).checkAndClearProcessResult("A:(null<-null)");
-
- driver.process(topic1, "A", null);
- driver.process(topic1, "B", null);
- driver.flushState();
- processors.get(0).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
- processors.get(1).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+ driver.pipeInput(recordFactory.create(topic1, "A", 2));
+ driver.pipeInput(recordFactory.create(topic1, "B", 2));
+
+ processors.get(0).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+ processors.get(1).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+
+ driver.pipeInput(recordFactory.create(topic1, "A", 3));
+
+ processors.get(0).checkAndClearProcessResult("A:(3<-null)");
+ processors.get(1).checkAndClearProcessResult("A:(null<-null)");
+
+ driver.pipeInput(recordFactory.create(topic1, "A", null));
+ driver.pipeInput(recordFactory.create(topic1, "B", null));
+
+ processors.get(0).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+ processors.get(1).checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+ }
}
@@ -340,34 +338,34 @@ public class KTableFilterTest {
topology.addProcessor("proc1", supplier, table1.name);
topology.addProcessor("proc2", supplier, table2.name);
- driver.setUp(builder, stateDir, Serdes.String(), Serdes.Integer());
+ try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
- driver.process(topic1, "A", 1);
- driver.process(topic1, "B", 1);
- driver.process(topic1, "C", 1);
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic1, "A", 1));
+ driver.pipeInput(recordFactory.create(topic1, "B", 1));
+ driver.pipeInput(recordFactory.create(topic1, "C", 1));
- final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
+ final List<MockProcessor<String, Integer>> processors = supplier.capturedProcessors(2);
- processors.get(0).checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
- processors.get(1).checkEmptyAndClearProcessResult();
+ processors.get(0).checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+ processors.get(1).checkEmptyAndClearProcessResult();
+
+ driver.pipeInput(recordFactory.create(topic1, "A", 2));
+ driver.pipeInput(recordFactory.create(topic1, "B", 2));
+
+ processors.get(0).checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
+ processors.get(1).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+
+ driver.pipeInput(recordFactory.create(topic1, "A", 3));
+
+ processors.get(0).checkAndClearProcessResult("A:(3<-2)");
+ processors.get(1).checkAndClearProcessResult("A:(null<-2)");
+
+ driver.pipeInput(recordFactory.create(topic1, "A", null));
+ driver.pipeInput(recordFactory.create(topic1, "B", null));
- driver.process(topic1, "A", 2);
- driver.process(topic1, "B", 2);
- driver.flushState();
- processors.get(0).checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
- processors.get(1).checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
-
- driver.process(topic1, "A", 3);
- driver.flushState();
- processors.get(0).checkAndClearProcessResult("A:(3<-2)");
- processors.get(1).checkAndClearProcessResult("A:(null<-2)");
-
- driver.process(topic1, "A", null);
- driver.process(topic1, "B", null);
- driver.flushState();
- processors.get(0).checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
- processors.get(1).checkAndClearProcessResult("B:(null<-2)");
+ processors.get(0).checkAndClearProcessResult("A:(null<-3)", "B:(null<-2)");
+ processors.get(1).checkAndClearProcessResult("B:(null<-2)");
+ }
}
@Test
@@ -418,12 +416,13 @@ public class KTableFilterTest {
topology.addProcessor("proc1", supplier, table1.name);
topology.addProcessor("proc2", supplier, table2.name);
- driver.setUp(builder, stateDir, stringSerde, stringSerde);
+ final ConsumerRecordFactory<String, String> stringRecordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+ try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
- driver.process(topic1, "A", "reject");
- driver.process(topic1, "B", "reject");
- driver.process(topic1, "C", "reject");
- driver.flushState();
+ driver.pipeInput(stringRecordFactory.create(topic1, "A", "reject"));
+ driver.pipeInput(stringRecordFactory.create(topic1, "B", "reject"));
+ driver.pipeInput(stringRecordFactory.create(topic1, "C", "reject"));
+ }
final List<MockProcessor<String, String>> processors = supplier.capturedProcessors(2);
processors.get(0).checkAndClearProcessResult("A:(reject<-null)", "B:(reject<-null)", "C:(reject<-null)");
@@ -437,7 +436,7 @@ public class KTableFilterTest {
String topic1 = "topic1";
- final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
+ final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
KTableImpl<String, String, String> table1 =
(KTableImpl<String, String, String>) builder.table(topic1, consumed);
KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter(
@@ -459,7 +458,7 @@ public class KTableFilterTest {
String topic1 = "topic1";
- final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
+ final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
KTableImpl<String, String, String> table1 =
(KTableImpl<String, String, String>) builder.table(topic1, consumed);
KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter(
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 0b9c1ab..016cde2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -16,12 +16,17 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyDescription;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyTestDriverWrapper;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
@@ -30,10 +35,11 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.SinkNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
@@ -41,36 +47,31 @@ import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Before;
-import org.junit.Rule;
import org.junit.Test;
-import java.io.File;
import java.lang.reflect.Field;
import java.util.List;
+import java.util.Properties;
import static org.easymock.EasyMock.mock;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
public class KTableImplTest {
- private final Serde<String> stringSerde = Serdes.String();
- private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
- private final Produced<String, String> produced = Produced.with(stringSerde, stringSerde);
+ private final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
+ private final Produced<String, String> produced = Produced.with(Serdes.String(), Serdes.String());
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
+ private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
- private File stateDir = null;
private StreamsBuilder builder;
private KTable<String, String> table;
@Before
public void setUp() {
- stateDir = TestUtils.tempDirectory("kafka-test");
builder = new StreamsBuilder();
table = builder.table("test");
}
@@ -110,17 +111,12 @@ public class KTableImplTest {
table4.toStream().process(supplier);
- driver.setUp(builder, stateDir);
-
- driver.process(topic1, "A", "01");
- driver.flushState();
- driver.process(topic1, "B", "02");
- driver.flushState();
- driver.process(topic1, "C", "03");
- driver.flushState();
- driver.process(topic1, "D", "04");
- driver.flushState();
- driver.flushState();
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+ driver.pipeInput(recordFactory.create(topic1, "B", "02"));
+ driver.pipeInput(recordFactory.create(topic1, "C", "03"));
+ driver.pipeInput(recordFactory.create(topic1, "D", "04"));
+ }
final List<MockProcessor<String, Object>> processors = supplier.capturedProcessors(4);
assertEquals(Utils.mkList("A:01", "B:02", "C:03", "D:04"), processors.get(0).processed);
@@ -156,104 +152,109 @@ public class KTableImplTest {
table1.toStream().to(topic2, produced);
final KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) builder.table(topic2, consumed);
+ final Topology topology = builder.build();
+
final KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
final KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
final KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
final KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
- driver.setUp(builder, stateDir, null, null);
+ final InternalTopologyBuilder topologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
+ topologyBuilder.connectProcessorAndStateStores(table1.name, getterSupplier1.storeNames());
+ topologyBuilder.connectProcessorAndStateStores(table2.name, getterSupplier2.storeNames());
+ topologyBuilder.connectProcessorAndStateStores(table3.name, getterSupplier3.storeNames());
+ topologyBuilder.connectProcessorAndStateStores(table4.name, getterSupplier4.storeNames());
+
+ try (final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(topology, props)) {
- // two state store should be created
- assertEquals(2, driver.allStateStores().size());
+ assertEquals(2, driver.getAllStateStores().size());
- final KTableValueGetter<String, String> getter1 = getterSupplier1.get();
- getter1.init(driver.context());
- final KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
- getter2.init(driver.context());
- final KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
- getter3.init(driver.context());
- final KTableValueGetter<String, String> getter4 = getterSupplier4.get();
- getter4.init(driver.context());
+ final KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+ final KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+ final KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+ final KTableValueGetter<String, String> getter4 = getterSupplier4.get();
- driver.process(topic1, "A", "01");
- driver.process(topic1, "B", "01");
- driver.process(topic1, "C", "01");
- driver.flushState();
+ getter1.init(driver.setCurrentNodeForProcessorContext(table1.name));
+ getter2.init(driver.setCurrentNodeForProcessorContext(table2.name));
+ getter3.init(driver.setCurrentNodeForProcessorContext(table3.name));
+ getter4.init(driver.setCurrentNodeForProcessorContext(table4.name));
- assertEquals("01", getter1.get("A"));
- assertEquals("01", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
+ driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+ driver.pipeInput(recordFactory.create(topic1, "B", "01"));
+ driver.pipeInput(recordFactory.create(topic1, "C", "01"));
- assertEquals(new Integer(1), getter2.get("A"));
- assertEquals(new Integer(1), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
+ assertEquals("01", getter1.get("A"));
+ assertEquals("01", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
- assertNull(getter3.get("A"));
- assertNull(getter3.get("B"));
- assertNull(getter3.get("C"));
+ assertEquals(new Integer(1), getter2.get("A"));
+ assertEquals(new Integer(1), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
- assertEquals("01", getter4.get("A"));
- assertEquals("01", getter4.get("B"));
- assertEquals("01", getter4.get("C"));
+ assertNull(getter3.get("A"));
+ assertNull(getter3.get("B"));
+ assertNull(getter3.get("C"));
- driver.process(topic1, "A", "02");
- driver.process(topic1, "B", "02");
- driver.flushState();
+ assertEquals("01", getter4.get("A"));
+ assertEquals("01", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
- assertEquals("02", getter1.get("A"));
- assertEquals("02", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
+ driver.pipeInput(recordFactory.create(topic1, "A", "02"));
+ driver.pipeInput(recordFactory.create(topic1, "B", "02"));
- assertEquals(new Integer(2), getter2.get("A"));
- assertEquals(new Integer(2), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
+ assertEquals("02", getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
- assertEquals(new Integer(2), getter3.get("A"));
- assertEquals(new Integer(2), getter3.get("B"));
- assertNull(getter3.get("C"));
+ assertEquals(new Integer(2), getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
- assertEquals("02", getter4.get("A"));
- assertEquals("02", getter4.get("B"));
- assertEquals("01", getter4.get("C"));
+ assertEquals(new Integer(2), getter3.get("A"));
+ assertEquals(new Integer(2), getter3.get("B"));
+ assertNull(getter3.get("C"));
- driver.process(topic1, "A", "03");
- driver.flushState();
+ assertEquals("02", getter4.get("A"));
+ assertEquals("02", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
- assertEquals("03", getter1.get("A"));
- assertEquals("02", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
+ driver.pipeInput(recordFactory.create(topic1, "A", "03"));
- assertEquals(new Integer(3), getter2.get("A"));
- assertEquals(new Integer(2), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
+ assertEquals("03", getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
- assertNull(getter3.get("A"));
- assertEquals(new Integer(2), getter3.get("B"));
- assertNull(getter3.get("C"));
+ assertEquals(new Integer(3), getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
- assertEquals("03", getter4.get("A"));
- assertEquals("02", getter4.get("B"));
- assertEquals("01", getter4.get("C"));
+ assertNull(getter3.get("A"));
+ assertEquals(new Integer(2), getter3.get("B"));
+ assertNull(getter3.get("C"));
- driver.process(topic1, "A", null);
- driver.flushState();
+ assertEquals("03", getter4.get("A"));
+ assertEquals("02", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
- assertNull(getter1.get("A"));
- assertEquals("02", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
+ driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
+ assertNull(getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
- assertNull(getter2.get("A"));
- assertEquals(new Integer(2), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
- assertNull(getter3.get("A"));
- assertEquals(new Integer(2), getter3.get("B"));
- assertNull(getter3.get("C"));
+ assertNull(getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
- assertNull(getter4.get("A"));
- assertEquals("02", getter4.get("B"));
- assertEquals("01", getter4.get("C"));
+ assertNull(getter3.get("A"));
+ assertEquals(new Integer(2), getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertNull(getter4.get("A"));
+ assertEquals("02", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+ }
}
@Test
@@ -282,11 +283,9 @@ public class KTableImplTest {
}
});
- driver.setUp(builder, stateDir, null, null);
- driver.setTime(0L);
-
- // two state stores should be created
- assertEquals(2, driver.allStateStores().size());
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ assertEquals(2, driver.getAllStateStores().size());
+ }
}
@Test
@@ -323,15 +322,25 @@ public class KTableImplTest {
}
});
- driver.setUp(builder, stateDir, null, null);
- driver.setTime(0L);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ assertEquals(2, driver.getAllStateStores().size());
+ }
+ }
- // two state store should be created
- assertEquals(2, driver.allStateStores().size());
+ private void assertTopologyContainsProcessor(final Topology topology, final String processorName) {
+ for (final TopologyDescription.Subtopology subtopology: topology.describe().subtopologies()) {
+ for (final TopologyDescription.Node node: subtopology.nodes()) {
+ if (node.name().equals(processorName)) {
+ return;
+ }
+ }
+ }
+ throw new AssertionError("No processor named '" + processorName + "'"
+ + "found in the provided Topology:\n" + topology.describe());
}
@Test
- public void testRepartition() throws NoSuchFieldException, IllegalAccessException {
+ public void shouldCreateSourceAndSinkNodesForRepartitioningTopic() throws NoSuchFieldException, IllegalAccessException {
final String topic1 = "topic1";
final String storeName1 = "storeName1";
@@ -341,8 +350,8 @@ public class KTableImplTest {
(KTableImpl<String, String, String>) builder.table(topic1,
consumed,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(storeName1)
- .withKeySerde(stringSerde)
- .withValueSerde(stringSerde)
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.String())
);
table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper())
@@ -352,27 +361,26 @@ public class KTableImplTest {
table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper())
.reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("mock-result2"));
- driver.setUp(builder, stateDir, stringSerde, stringSerde);
- driver.setTime(0L);
+ final Topology topology = builder.build();
+ try (final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(topology, props)) {
- // three state store should be created, one for source, one for aggregate and one for reduce
- assertEquals(3, driver.allStateStores().size());
+ assertEquals(3, driver.getAllStateStores().size());
- // contains the corresponding repartition source / sink nodes
- assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000003"));
- assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000004"));
- assertTrue(driver.allProcessorNames().contains("KSTREAM-SINK-0000000007"));
- assertTrue(driver.allProcessorNames().contains("KSTREAM-SOURCE-0000000008"));
+ assertTopologyContainsProcessor(topology, "KSTREAM-SINK-0000000003");
+ assertTopologyContainsProcessor(topology, "KSTREAM-SOURCE-0000000004");
+ assertTopologyContainsProcessor(topology, "KSTREAM-SINK-0000000007");
+ assertTopologyContainsProcessor(topology, "KSTREAM-SOURCE-0000000008");
- Field valSerializerField = ((SinkNode) driver.processor("KSTREAM-SINK-0000000003")).getClass().getDeclaredField("valSerializer");
- Field valDeserializerField = ((SourceNode) driver.processor("KSTREAM-SOURCE-0000000004")).getClass().getDeclaredField("valDeserializer");
- valSerializerField.setAccessible(true);
- valDeserializerField.setAccessible(true);
+ Field valSerializerField = ((SinkNode) driver.getProcessor("KSTREAM-SINK-0000000003")).getClass().getDeclaredField("valSerializer");
+ Field valDeserializerField = ((SourceNode) driver.getProcessor("KSTREAM-SOURCE-0000000004")).getClass().getDeclaredField("valDeserializer");
+ valSerializerField.setAccessible(true);
+ valDeserializerField.setAccessible(true);
- assertNotNull(((ChangedSerializer) valSerializerField.get(driver.processor("KSTREAM-SINK-0000000003"))).inner());
- assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.processor("KSTREAM-SOURCE-0000000004"))).inner());
- assertNotNull(((ChangedSerializer) valSerializerField.get(driver.processor("KSTREAM-SINK-0000000007"))).inner());
- assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.processor("KSTREAM-SOURCE-0000000008"))).inner());
+ assertNotNull(((ChangedSerializer) valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000003"))).inner());
+ assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000004"))).inner());
+ assertNotNull(((ChangedSerializer) valSerializerField.get(driver.getProcessor("KSTREAM-SINK-0000000007"))).inner());
+ assertNotNull(((ChangedDeserializer) valDeserializerField.get(driver.getProcessor("KSTREAM-SOURCE-0000000008"))).inner());
+ }
}
@Test(expected = NullPointerException.class)
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index a54e43e..a01d5cb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -16,27 +16,30 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyTestDriverWrapper;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Rule;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
-import java.io.File;
+import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -45,27 +48,19 @@ import static org.junit.Assert.assertTrue;
public class KTableMapValuesTest {
- private final Serde<String> stringSerde = Serdes.String();
- private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
- private final Produced<String, String> produced = Produced.with(stringSerde, stringSerde);
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
- private File stateDir = null;
-
- @Before
- public void setUp() {
- stateDir = TestUtils.tempDirectory("kafka-test");
- }
+ private final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
+ private final Produced<String, String> produced = Produced.with(Serdes.String(), Serdes.String());
+ private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
private void doTestKTable(final StreamsBuilder builder, final String topic1, final MockProcessorSupplier<String, Integer> supplier) {
- driver.setUp(builder, stateDir, Serdes.String(), Serdes.String());
-
- driver.process(topic1, "A", "1");
- driver.process(topic1, "B", "2");
- driver.process(topic1, "C", "3");
- driver.process(topic1, "D", "4");
- driver.flushState();
- assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), supplier.theCapturedProcessor().processed);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ driver.pipeInput(recordFactory.create(topic1, "A", "1"));
+ driver.pipeInput(recordFactory.create(topic1, "B", "2"));
+ driver.pipeInput(recordFactory.create(topic1, "C", "3"));
+ driver.pipeInput(recordFactory.create(topic1, "D", "4"));
+ assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4"), supplier.theCapturedProcessor().processed);
+ }
}
@Test
@@ -114,99 +109,106 @@ public class KTableMapValuesTest {
final KTableImpl<String, String, Integer> table2,
final KTableImpl<String, Integer, Integer> table3,
final KTableImpl<String, String, String> table4) {
+
+ final Topology topology = builder.build();
+
final KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
final KTableValueGetterSupplier<String, Integer> getterSupplier2 = table2.valueGetterSupplier();
final KTableValueGetterSupplier<String, Integer> getterSupplier3 = table3.valueGetterSupplier();
final KTableValueGetterSupplier<String, String> getterSupplier4 = table4.valueGetterSupplier();
- driver.setUp(builder, stateDir, Serdes.String(), Serdes.String());
- final KTableValueGetter<String, String> getter1 = getterSupplier1.get();
- getter1.init(driver.context());
- final KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
- getter2.init(driver.context());
- final KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
- getter3.init(driver.context());
- final KTableValueGetter<String, String> getter4 = getterSupplier4.get();
- getter4.init(driver.context());
-
- driver.process(topic1, "A", "01");
- driver.process(topic1, "B", "01");
- driver.process(topic1, "C", "01");
- driver.flushState();
-
- assertEquals("01", getter1.get("A"));
- assertEquals("01", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
-
- assertEquals(new Integer(1), getter2.get("A"));
- assertEquals(new Integer(1), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
-
- assertNull(getter3.get("A"));
- assertNull(getter3.get("B"));
- assertNull(getter3.get("C"));
-
- assertEquals("01", getter4.get("A"));
- assertEquals("01", getter4.get("B"));
- assertEquals("01", getter4.get("C"));
-
- driver.process(topic1, "A", "02");
- driver.process(topic1, "B", "02");
- driver.flushState();
-
- assertEquals("02", getter1.get("A"));
- assertEquals("02", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
-
- assertEquals(new Integer(2), getter2.get("A"));
- assertEquals(new Integer(2), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
-
- assertEquals(new Integer(2), getter3.get("A"));
- assertEquals(new Integer(2), getter3.get("B"));
- assertNull(getter3.get("C"));
-
- assertEquals("02", getter4.get("A"));
- assertEquals("02", getter4.get("B"));
- assertEquals("01", getter4.get("C"));
-
- driver.process(topic1, "A", "03");
- driver.flushState();
-
- assertEquals("03", getter1.get("A"));
- assertEquals("02", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
-
- assertEquals(new Integer(3), getter2.get("A"));
- assertEquals(new Integer(2), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
-
- assertNull(getter3.get("A"));
- assertEquals(new Integer(2), getter3.get("B"));
- assertNull(getter3.get("C"));
-
- assertEquals("03", getter4.get("A"));
- assertEquals("02", getter4.get("B"));
- assertEquals("01", getter4.get("C"));
-
- driver.process(topic1, "A", null);
- driver.flushState();
-
- assertNull(getter1.get("A"));
- assertEquals("02", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
-
- assertNull(getter2.get("A"));
- assertEquals(new Integer(2), getter2.get("B"));
- assertEquals(new Integer(1), getter2.get("C"));
-
- assertNull(getter3.get("A"));
- assertEquals(new Integer(2), getter3.get("B"));
- assertNull(getter3.get("C"));
-
- assertNull(getter4.get("A"));
- assertEquals("02", getter4.get("B"));
- assertEquals("01", getter4.get("C"));
+ final InternalTopologyBuilder topologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
+ topologyBuilder.connectProcessorAndStateStores(table1.name, getterSupplier1.storeNames());
+ topologyBuilder.connectProcessorAndStateStores(table2.name, getterSupplier2.storeNames());
+ topologyBuilder.connectProcessorAndStateStores(table3.name, getterSupplier3.storeNames());
+ topologyBuilder.connectProcessorAndStateStores(table4.name, getterSupplier4.storeNames());
+
+ try (final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(builder.build(), props)) {
+ KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+ KTableValueGetter<String, Integer> getter2 = getterSupplier2.get();
+ KTableValueGetter<String, Integer> getter3 = getterSupplier3.get();
+ KTableValueGetter<String, String> getter4 = getterSupplier4.get();
+
+ getter1.init(driver.setCurrentNodeForProcessorContext(table1.name));
+ getter2.init(driver.setCurrentNodeForProcessorContext(table2.name));
+ getter3.init(driver.setCurrentNodeForProcessorContext(table3.name));
+ getter4.init(driver.setCurrentNodeForProcessorContext(table4.name));
+
+ driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+ driver.pipeInput(recordFactory.create(topic1, "B", "01"));
+ driver.pipeInput(recordFactory.create(topic1, "C", "01"));
+
+ assertEquals("01", getter1.get("A"));
+ assertEquals("01", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertEquals(new Integer(1), getter2.get("A"));
+ assertEquals(new Integer(1), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertNull(getter3.get("A"));
+ assertNull(getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertEquals("01", getter4.get("A"));
+ assertEquals("01", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+
+ driver.pipeInput(recordFactory.create(topic1, "A", "02"));
+ driver.pipeInput(recordFactory.create(topic1, "B", "02"));
+
+ assertEquals("02", getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertEquals(new Integer(2), getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertEquals(new Integer(2), getter3.get("A"));
+ assertEquals(new Integer(2), getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertEquals("02", getter4.get("A"));
+ assertEquals("02", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+
+ driver.pipeInput(recordFactory.create(topic1, "A", "03"));
+
+ assertEquals("03", getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertEquals(new Integer(3), getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertNull(getter3.get("A"));
+ assertEquals(new Integer(2), getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertEquals("03", getter4.get("A"));
+ assertEquals("02", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+
+ driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
+
+ assertNull(getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+
+ assertNull(getter2.get("A"));
+ assertEquals(new Integer(2), getter2.get("B"));
+ assertEquals(new Integer(1), getter2.get("C"));
+
+ assertNull(getter3.get("A"));
+ assertEquals(new Integer(2), getter3.get("B"));
+ assertNull(getter3.get("C"));
+
+ assertNull(getter4.get("A"));
+ assertEquals("02", getter4.get("B"));
+ assertEquals("01", getter4.get("C"));
+ }
}
@Test
@@ -244,6 +246,8 @@ public class KTableMapValuesTest {
final String topic1 = "topic1";
final String topic2 = "topic2";
+ final String storeName2 = "anyMapName";
+ final String storeName3 = "anyFilterName";
final KTableImpl<String, String, String> table1 =
(KTableImpl<String, String, String>) builder.table(topic1, consumed);
@@ -253,14 +257,14 @@ public class KTableMapValuesTest {
public Integer apply(String value) {
return new Integer(value);
}
- }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyMapName").withValueSerde(Serdes.Integer()));
+ }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(storeName2).withValueSerde(Serdes.Integer()));
final KTableImpl<String, Integer, Integer> table3 = (KTableImpl<String, Integer, Integer>) table2.filter(
new Predicate<String, Integer>() {
@Override
public boolean test(String key, Integer value) {
return (value % 2) == 0;
}
- }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as("anyFilterName").withValueSerde(Serdes.Integer()));
+ }, Materialized.<String, Integer, KeyValueStore<Bytes, byte[]>>as(storeName3).withValueSerde(Serdes.Integer()));
table1.toStream().to(topic2, produced);
final KTableImpl<String, String, String> table4 = (KTableImpl<String, String, String>) builder.table(topic2, consumed);
@@ -285,37 +289,34 @@ public class KTableMapValuesTest {
final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
- builder.build().addProcessor("proc", supplier, table2.name);
+ final Topology topology = builder.build().addProcessor("proc", supplier, table2.name);
- driver.setUp(builder, stateDir);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
- final MockProcessor<String, Integer> proc = supplier.theCapturedProcessor();
+ final MockProcessor<String, Integer> proc = supplier.theCapturedProcessor();
- assertFalse(table1.sendingOldValueEnabled());
- assertFalse(table2.sendingOldValueEnabled());
+ assertFalse(table1.sendingOldValueEnabled());
+ assertFalse(table2.sendingOldValueEnabled());
- driver.process(topic1, "A", "01");
- driver.process(topic1, "B", "01");
- driver.process(topic1, "C", "01");
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+ driver.pipeInput(recordFactory.create(topic1, "B", "01"));
+ driver.pipeInput(recordFactory.create(topic1, "C", "01"));
- proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+ proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
- driver.process(topic1, "A", "02");
- driver.process(topic1, "B", "02");
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic1, "A", "02"));
+ driver.pipeInput(recordFactory.create(topic1, "B", "02"));
- proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
+ proc.checkAndClearProcessResult("A:(2<-null)", "B:(2<-null)");
- driver.process(topic1, "A", "03");
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic1, "A", "03"));
- proc.checkAndClearProcessResult("A:(3<-null)");
+ proc.checkAndClearProcessResult("A:(3<-null)");
- driver.process(topic1, "A", null);
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
- proc.checkAndClearProcessResult("A:(null<-null)");
+ proc.checkAndClearProcessResult("A:(null<-null)");
+ }
}
@Test
@@ -340,34 +341,31 @@ public class KTableMapValuesTest {
builder.build().addProcessor("proc", supplier, table2.name);
- driver.setUp(builder, stateDir);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
- final MockProcessor<String, Integer> proc = supplier.theCapturedProcessor();
+ final MockProcessor<String, Integer> proc = supplier.theCapturedProcessor();
- assertTrue(table1.sendingOldValueEnabled());
- assertTrue(table2.sendingOldValueEnabled());
+ assertTrue(table1.sendingOldValueEnabled());
+ assertTrue(table2.sendingOldValueEnabled());
- driver.process(topic1, "A", "01");
- driver.process(topic1, "B", "01");
- driver.process(topic1, "C", "01");
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+ driver.pipeInput(recordFactory.create(topic1, "B", "01"));
+ driver.pipeInput(recordFactory.create(topic1, "C", "01"));
- proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
+ proc.checkAndClearProcessResult("A:(1<-null)", "B:(1<-null)", "C:(1<-null)");
- driver.process(topic1, "A", "02");
- driver.process(topic1, "B", "02");
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic1, "A", "02"));
+ driver.pipeInput(recordFactory.create(topic1, "B", "02"));
- proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
+ proc.checkAndClearProcessResult("A:(2<-1)", "B:(2<-1)");
- driver.process(topic1, "A", "03");
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic1, "A", "03"));
- proc.checkAndClearProcessResult("A:(3<-2)");
+ proc.checkAndClearProcessResult("A:(3<-2)");
- driver.process(topic1, "A", null);
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
- proc.checkAndClearProcessResult("A:(null<-3)");
+ proc.checkAndClearProcessResult("A:(null<-3)");
+ }
}
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 504d841..80a60ab 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -16,22 +16,26 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyTestDriverWrapper;
+import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
-import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Rule;
+import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Test;
-import java.io.File;
+import java.util.Properties;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
import static org.hamcrest.CoreMatchers.hasItem;
@@ -42,17 +46,9 @@ import static org.junit.Assert.assertTrue;
public class KTableSourceTest {
- final private Serde<String> stringSerde = Serdes.String();
- private final Consumed<String, String> stringConsumed = Consumed.with(stringSerde, stringSerde);
- final private Serde<Integer> intSerde = Serdes.Integer();
- @Rule
- public final KStreamTestDriver driver = new KStreamTestDriver();
- private File stateDir = null;
-
- @Before
- public void setUp() {
- stateDir = TestUtils.tempDirectory("kafka-test");
- }
+ private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
+ private final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer());
+ private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(), Serdes.String());
@Test
public void testKTable() {
@@ -60,38 +56,38 @@ public class KTableSourceTest {
final String topic1 = "topic1";
- final KTable<String, Integer> table1 = builder.table(topic1, Consumed.with(stringSerde, intSerde));
+ final KTable<String, Integer> table1 = builder.table(topic1, Consumed.with(Serdes.String(), Serdes.Integer()));
final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
table1.toStream().process(supplier);
- driver.setUp(builder, stateDir);
- driver.process(topic1, "A", 1);
- driver.process(topic1, "B", 2);
- driver.process(topic1, "C", 3);
- driver.process(topic1, "D", 4);
- driver.flushState();
- driver.process(topic1, "A", null);
- driver.process(topic1, "B", null);
- driver.flushState();
+ final ConsumerRecordFactory<String, Integer> integerFactory = new ConsumerRecordFactory<>(new StringSerializer(), new IntegerSerializer());
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ driver.pipeInput(integerFactory.create(topic1, "A", 1));
+ driver.pipeInput(integerFactory.create(topic1, "B", 2));
+ driver.pipeInput(integerFactory.create(topic1, "C", 3));
+ driver.pipeInput(integerFactory.create(topic1, "D", 4));
+ driver.pipeInput(integerFactory.create(topic1, "A", null));
+ driver.pipeInput(integerFactory.create(topic1, "B", null));
+ }
assertEquals(Utils.mkList("A:1", "B:2", "C:3", "D:4", "A:null", "B:null"), supplier.theCapturedProcessor().processed);
}
@Test
public void kTableShouldLogAndMeterOnSkippedRecords() {
- final StreamsBuilder streamsBuilder = new StreamsBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic";
- streamsBuilder.table(topic, Consumed.with(stringSerde, intSerde));
+ builder.table(topic, stringConsumed);
final LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
- driver.setUp(streamsBuilder, stateDir);
- driver.process(topic, null, "value");
- driver.flushState();
- LogCaptureAppender.unregister(appender);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) {
+ driver.pipeInput(recordFactory.create(topic, null, "value"));
+ LogCaptureAppender.unregister(appender);
- assertEquals(1.0, getMetricByName(driver.context().metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
- assertThat(appender.getMessages(), hasItem("Skipping record due to null key. topic=[topic] partition=[-1] offset=[-1]"));
+ assertEquals(1.0, getMetricByName(driver.metrics(), "skipped-records-total", "stream-metrics").metricValue());
+ assertThat(appender.getMessages(), hasItem("Skipping record due to null key. topic=[topic] partition=[0] offset=[0]"));
+ }
}
@Test
@@ -102,39 +98,45 @@ public class KTableSourceTest {
final KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, stringConsumed);
+ final Topology topology = builder.build();
+
final KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
- driver.setUp(builder, stateDir);
- final KTableValueGetter<String, String> getter1 = getterSupplier1.get();
- getter1.init(driver.context());
+ final InternalTopologyBuilder topologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
+ topologyBuilder.connectProcessorAndStateStores(table1.name, getterSupplier1.storeNames());
+
+ try (final TopologyTestDriverWrapper driver = new TopologyTestDriverWrapper(builder.build(), props)) {
+ final KTableValueGetter<String, String> getter1 = getterSupplier1.get();
+ getter1.init(driver.setCurrentNodeForProcessorContext(table1.name));
- driver.process(topic1, "A", "01");
- driver.process(topic1, "B", "01");
- driver.process(topic1, "C", "01");
+ driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+ driver.pipeInput(recordFactory.create(topic1, "B", "01"));
+ driver.pipeInput(recordFactory.create(topic1, "C", "01"));
- assertEquals("01", getter1.get("A"));
- assertEquals("01", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
+ assertEquals("01", getter1.get("A"));
+ assertEquals("01", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
- driver.process(topic1, "A", "02");
- driver.process(topic1, "B", "02");
+ driver.pipeInput(recordFactory.create(topic1, "A", "02"));
+ driver.pipeInput(recordFactory.create(topic1, "B", "02"));
- assertEquals("02", getter1.get("A"));
- assertEquals("02", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
+ assertEquals("02", getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
- driver.process(topic1, "A", "03");
+ driver.pipeInput(recordFactory.create(topic1, "A", "03"));
- assertEquals("03", getter1.get("A"));
- assertEquals("02", getter1.get("B"));
- assertEquals("01", getter1.get("C"));
+ assertEquals("03", getter1.get("A"));
+ assertEquals("02", getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
- driver.process(topic1, "A", null);
- driver.process(topic1, "B", null);
+ driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
+ driver.pipeInput(recordFactory.create(topic1, "B", (String) null));
- assertNull(getter1.get("A"));
- assertNull(getter1.get("B"));
- assertEquals("01", getter1.get("C"));
+ assertNull(getter1.get("A"));
+ assertNull(getter1.get("B"));
+ assertEquals("01", getter1.get("C"));
+ }
}
@@ -148,35 +150,32 @@ public class KTableSourceTest {
final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
- builder.build().addProcessor("proc1", supplier, table1.name);
+ final Topology topology = builder.build().addProcessor("proc1", supplier, table1.name);
- driver.setUp(builder, stateDir);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
- final MockProcessor<String, Integer> proc1 = supplier.theCapturedProcessor();
+ final MockProcessor<String, Integer> proc1 = supplier.theCapturedProcessor();
- driver.process(topic1, "A", "01");
- driver.process(topic1, "B", "01");
- driver.process(topic1, "C", "01");
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+ driver.pipeInput(recordFactory.create(topic1, "B", "01"));
+ driver.pipeInput(recordFactory.create(topic1, "C", "01"));
- proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
+ proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
- driver.process(topic1, "A", "02");
- driver.process(topic1, "B", "02");
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic1, "A", "02"));
+ driver.pipeInput(recordFactory.create(topic1, "B", "02"));
- proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)");
+ proc1.checkAndClearProcessResult("A:(02<-null)", "B:(02<-null)");
- driver.process(topic1, "A", "03");
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic1, "A", "03"));
- proc1.checkAndClearProcessResult("A:(03<-null)");
+ proc1.checkAndClearProcessResult("A:(03<-null)");
- driver.process(topic1, "A", null);
- driver.process(topic1, "B", null);
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
+ driver.pipeInput(recordFactory.create(topic1, "B", (String) null));
- proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+ proc1.checkAndClearProcessResult("A:(null<-null)", "B:(null<-null)");
+ }
}
@Test
@@ -193,34 +192,31 @@ public class KTableSourceTest {
final MockProcessorSupplier<String, Integer> supplier = new MockProcessorSupplier<>();
- builder.build().addProcessor("proc1", supplier, table1.name);
+ final Topology topology = builder.build().addProcessor("proc1", supplier, table1.name);
- driver.setUp(builder, stateDir);
+ try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) {
- final MockProcessor<String, Integer> proc1 = supplier.theCapturedProcessor();
+ final MockProcessor<String, Integer> proc1 = supplier.theCapturedProcessor();
- driver.process(topic1, "A", "01");
- driver.process(topic1, "B", "01");
- driver.process(topic1, "C", "01");
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic1, "A", "01"));
+ driver.pipeInput(recordFactory.create(topic1, "B", "01"));
+ driver.pipeInput(recordFactory.create(topic1, "C", "01"));
- proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
+ proc1.checkAndClearProcessResult("A:(01<-null)", "B:(01<-null)", "C:(01<-null)");
- driver.process(topic1, "A", "02");
- driver.process(topic1, "B", "02");
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic1, "A", "02"));
+ driver.pipeInput(recordFactory.create(topic1, "B", "02"));
- proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)");
+ proc1.checkAndClearProcessResult("A:(02<-01)", "B:(02<-01)");
- driver.process(topic1, "A", "03");
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic1, "A", "03"));
- proc1.checkAndClearProcessResult("A:(03<-02)");
+ proc1.checkAndClearProcessResult("A:(03<-02)");
- driver.process(topic1, "A", null);
- driver.process(topic1, "B", null);
- driver.flushState();
+ driver.pipeInput(recordFactory.create(topic1, "A", (String) null));
+ driver.pipeInput(recordFactory.create(topic1, "B", (String) null));
- proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)");
+ proc1.checkAndClearProcessResult("A:(null<-03)", "B:(null<-02)");
+ }
}
}
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 7f75265..155e54c 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -177,7 +177,7 @@ public class TopologyTestDriver implements Closeable {
private final static int PARTITION_ID = 0;
private final static TaskId TASK_ID = new TaskId(0, PARTITION_ID);
- private final StreamTask task;
+ final StreamTask task;
private final GlobalStateUpdateTask globalStateTask;
private final GlobalStateManager globalStateManager;
@@ -185,7 +185,8 @@ public class TopologyTestDriver implements Closeable {
private final StateDirectory stateDirectory;
private final Metrics metrics;
- private final ProcessorTopology processorTopology;
+ final ProcessorTopology processorTopology;
+ final ProcessorTopology globalTopology;
private final MockProducer<byte[], byte[]> producer;
@@ -240,7 +241,7 @@ public class TopologyTestDriver implements Closeable {
internalTopologyBuilder.setApplicationId(streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG));
processorTopology = internalTopologyBuilder.build(null);
- final ProcessorTopology globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
+ globalTopology = internalTopologyBuilder.buildGlobalStateTopology();
final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
producer = new MockProducer<byte[], byte[]>(true, bytesSerializer, bytesSerializer) {
--
To stop receiving notification emails like this one, please contact
guozhang@apache.org.