You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/08/16 06:03:23 UTC
kafka git commit: MINOR: Add application id prefix for
copartitionGroups in TopologyBuilder
Repository: kafka
Updated Branches:
refs/heads/0.10.0 55af7ec6b -> 801a70612
MINOR: Add application id prefix for copartitionGroups in TopologyBuilder
This is bugfix that is already in trunk but not backported to 0.10.0.
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Damian Guy <da...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1735 from guozhangwang/Kminor-topology-applicationID-0.10.0
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/801a7061
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/801a7061
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/801a7061
Branch: refs/heads/0.10.0
Commit: 801a706124af16f605abc6141f38f9eed916ffc2
Parents: 55af7ec
Author: Guozhang Wang <wa...@gmail.com>
Authored: Mon Aug 15 23:04:40 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Aug 15 23:04:40 2016 -0700
----------------------------------------------------------------------
.../org/apache/kafka/streams/KafkaStreams.java | 2 +
.../streams/processor/TopologyBuilder.java | 79 ++++++++++++++------
.../internals/StreamPartitionAssignor.java | 2 +-
.../processor/internals/StreamThread.java | 6 +-
.../kstream/internals/KStreamImplTest.java | 2 +-
.../streams/processor/TopologyBuilderTest.java | 25 ++++---
.../internals/ProcessorTopologyTest.java | 2 +-
.../internals/StreamPartitionAssignorTest.java | 12 +--
.../processor/internals/StreamThreadTest.java | 6 +-
.../apache/kafka/test/KStreamTestDriver.java | 2 +-
.../kafka/test/ProcessorTopologyTestDriver.java | 2 +-
11 files changed, 91 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 17c760e..3a311a8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -141,6 +141,8 @@ public class KafkaStreams {
// The application ID is a required config and hence should always have value
final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
+ builder.setApplicationId(applicationId);
+
String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
clientId = applicationId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement();
http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 7161a80..6b57b17 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -38,6 +38,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
/**
@@ -64,6 +65,7 @@ public class TopologyBuilder {
private final HashMap<String, String[]> nodeToSourceTopics = new HashMap<>();
private final HashMap<String, String> nodeToSinkTopic = new HashMap<>();
private Map<Integer, Set<String>> nodeGroups = null;
+ private String applicationId = null;
private static class StateStoreFactory {
public final Set<String> users;
@@ -85,7 +87,7 @@ public class TopologyBuilder {
this.name = name;
}
- public abstract ProcessorNode build(String applicationId);
+ public abstract ProcessorNode build();
}
private static class ProcessorNodeFactory extends NodeFactory {
@@ -105,7 +107,7 @@ public class TopologyBuilder {
@SuppressWarnings("unchecked")
@Override
- public ProcessorNode build(String applicationId) {
+ public ProcessorNode build() {
return new ProcessorNode(name, supplier.get(), stateStoreNames);
}
}
@@ -124,7 +126,7 @@ public class TopologyBuilder {
@SuppressWarnings("unchecked")
@Override
- public ProcessorNode build(String applicationId) {
+ public ProcessorNode build() {
return new SourceNode(name, keyDeserializer, valDeserializer);
}
}
@@ -147,10 +149,10 @@ public class TopologyBuilder {
@SuppressWarnings("unchecked")
@Override
- public ProcessorNode build(String applicationId) {
+ public ProcessorNode build() {
if (internalTopicNames.contains(topic)) {
// prefix the internal topic name with the application id
- return new SinkNode(name, applicationId + "-" + topic, keySerializer, valSerializer, partitioner);
+ return new SinkNode(name, decorateTopic(topic), keySerializer, valSerializer, partitioner);
} else {
return new SinkNode(name, topic, keySerializer, valSerializer, partitioner);
}
@@ -193,6 +195,22 @@ public class TopologyBuilder {
public TopologyBuilder() {}
/**
+ * Set the applicationId to be used for auto-generated internal topics.
+ *
+ * This is required before calling {@link #sourceTopics}, {@link #topicGroups},
+ * {@link #copartitionSources} and {@link #build(Integer)}.
+ *
+ * @param applicationId the streams applicationId. Should be the same as set by
+ * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG}
+ */
+ public synchronized final TopologyBuilder setApplicationId(String applicationId) {
+ Objects.requireNonNull(applicationId, "applicationId can't be null");
+ this.applicationId = applicationId;
+
+ return this;
+ }
+
+ /**
* Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes.
* The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and
* {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the
@@ -501,7 +519,7 @@ public class TopologyBuilder {
*
* @return groups of topic names
*/
- public synchronized Map<Integer, TopicsInfo> topicGroups(String applicationId) {
+ public synchronized Map<Integer, TopicsInfo> topicGroups() {
Map<Integer, TopicsInfo> topicGroups = new HashMap<>();
if (nodeGroups == null)
@@ -520,7 +538,7 @@ public class TopologyBuilder {
for (String topic : topics) {
if (this.internalTopicNames.contains(topic)) {
// prefix the internal topic name with the application id
- String internalTopic = applicationId + "-" + topic;
+ String internalTopic = decorateTopic(topic);
internalSourceTopics.add(internalTopic);
sourceTopics.add(internalTopic);
} else {
@@ -534,7 +552,7 @@ public class TopologyBuilder {
if (topic != null) {
if (internalTopicNames.contains(topic)) {
// prefix the change log topic name with the application id
- sinkTopics.add(applicationId + "-" + topic);
+ sinkTopics.add(decorateTopic(topic));
} else {
sinkTopics.add(topic);
}
@@ -544,7 +562,7 @@ public class TopologyBuilder {
for (StateStoreFactory stateFactory : stateFactories.values()) {
if (stateFactory.isInternal && stateFactory.users.contains(node)) {
// prefix the change log topic name with the application id
- stateChangelogTopics.add(applicationId + "-" + stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX);
+ stateChangelogTopics.add(ProcessorStateManager.storeChangelogTopic(applicationId, stateFactory.supplier.name()));
}
}
}
@@ -629,7 +647,7 @@ public class TopologyBuilder {
for (String node : nodeNames) {
String[] topics = nodeToSourceTopics.get(node);
if (topics != null)
- copartitionGroup.addAll(Arrays.asList(topics));
+ copartitionGroup.addAll(maybeDecorateInternalSourceTopics(topics));
}
list.add(Collections.unmodifiableSet(copartitionGroup));
}
@@ -642,7 +660,7 @@ public class TopologyBuilder {
*
* @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)
*/
- public synchronized ProcessorTopology build(String applicationId, Integer topicGroupId) {
+ public synchronized ProcessorTopology build(Integer topicGroupId) {
Set<String> nodeGroup;
if (topicGroupId != null) {
nodeGroup = nodeGroups().get(topicGroupId);
@@ -650,11 +668,11 @@ public class TopologyBuilder {
// when nodeGroup is null, we build the full topology. this is used in some tests.
nodeGroup = null;
}
- return build(applicationId, nodeGroup);
+ return build(nodeGroup);
}
@SuppressWarnings("unchecked")
- private ProcessorTopology build(String applicationId, Set<String> nodeGroup) {
+ private ProcessorTopology build(Set<String> nodeGroup) {
List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size());
Map<String, ProcessorNode> processorMap = new HashMap<>();
Map<String, SourceNode> topicSourceMap = new HashMap<>();
@@ -663,7 +681,7 @@ public class TopologyBuilder {
// create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
for (NodeFactory factory : nodeFactories.values()) {
if (nodeGroup == null || nodeGroup.contains(factory.name)) {
- ProcessorNode node = factory.build(applicationId);
+ ProcessorNode node = factory.build();
processorNodes.add(node);
processorMap.put(node.name(), node);
@@ -680,7 +698,7 @@ public class TopologyBuilder {
for (String topic : ((SourceNodeFactory) factory).topics) {
if (internalTopicNames.contains(topic)) {
// prefix the internal topic name with the application id
- topicSourceMap.put(applicationId + "-" + topic, (SourceNode) node);
+ topicSourceMap.put(decorateTopic(topic), (SourceNode) node);
} else {
topicSourceMap.put(topic, (SourceNode) node);
}
@@ -702,15 +720,34 @@ public class TopologyBuilder {
* Get the names of topics that are to be consumed by the source nodes created by this builder.
* @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null
*/
- public synchronized Set<String> sourceTopics(String applicationId) {
- Set<String> topics = new HashSet<>();
- for (String topic : sourceTopicNames) {
+ public synchronized Set<String> sourceTopics() {
+ Set<String> topics = maybeDecorateInternalSourceTopics(sourceTopicNames);
+ return Collections.unmodifiableSet(topics);
+ }
+
+ private Set<String> maybeDecorateInternalSourceTopics(final Set<String> sourceTopics) {
+ return maybeDecorateInternalSourceTopics(sourceTopics.toArray(new String[sourceTopics.size()]));
+ }
+
+ private Set<String> maybeDecorateInternalSourceTopics(String ... sourceTopics) {
+ final Set<String> decoratedTopics = new HashSet<>();
+ for (String topic : sourceTopics) {
if (internalTopicNames.contains(topic)) {
- topics.add(applicationId + "-" + topic);
+ decoratedTopics.add(decorateTopic(topic));
} else {
- topics.add(topic);
+ decoratedTopics.add(topic);
}
}
- return Collections.unmodifiableSet(topics);
+ return decoratedTopics;
+ }
+
+ private String decorateTopic(String topic) {
+ if (applicationId == null) {
+ throw new TopologyBuilderException("there are internal topics and "
+ + "applicationId hasn't been set. Call "
+ + "setApplicationId first");
+ }
+
+ return applicationId + "-" + topic;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 085ff94..2ddfe43 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -118,7 +118,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
streamThread = (StreamThread) o;
streamThread.partitionAssignor(this);
- this.topicGroups = streamThread.builder.topicGroups(streamThread.applicationId);
+ this.topicGroups = streamThread.builder.topicGroups();
if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
internalTopicManager = new InternalTopicManager(
http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 72eeef5..bf88d1b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -159,7 +159,7 @@ public class StreamThread extends Thread {
this.applicationId = applicationId;
this.config = config;
this.builder = builder;
- this.sourceTopics = builder.sourceTopics(applicationId);
+ this.sourceTopics = builder.sourceTopics();
this.clientId = clientId;
this.processId = processId;
this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
@@ -545,7 +545,7 @@ public class StreamThread extends Thread {
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
sensors.taskCreationSensor.record();
- ProcessorTopology topology = builder.build(applicationId, id.topicGroupId);
+ ProcessorTopology topology = builder.build(id.topicGroupId);
return new StreamTask(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, sensors);
}
@@ -615,7 +615,7 @@ public class StreamThread extends Thread {
protected StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
sensors.taskCreationSensor.record();
- ProcessorTopology topology = builder.build(applicationId, id.topicGroupId);
+ ProcessorTopology topology = builder.build(id.topicGroupId);
if (!topology.stateStoreSuppliers().isEmpty()) {
return new StandbyTask(id, applicationId, partitions, topology, consumer, restoreConsumer, config, sensors);
http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index a40c8fb..ff16a79 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -132,7 +132,7 @@ public class KStreamImplTest {
1 + // to
2 + // through
1, // process
- builder.build("X", null).processors().size());
+ builder.build(null).processors().size());
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 9af313a..a67b4a9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -144,12 +144,12 @@ public class TopologyBuilderTest {
builder.addSource("source-3", "topic-3");
builder.addInternalTopic("topic-3");
- Set<String> expected = new HashSet<String>();
+ Set<String> expected = new HashSet<>();
expected.add("topic-1");
expected.add("topic-2");
expected.add("X-topic-3");
- assertEquals(expected, builder.sourceTopics("X"));
+ assertEquals(expected, builder.setApplicationId("X").sourceTopics());
}
@Test(expected = TopologyBuilderException.class)
@@ -190,21 +190,22 @@ public class TopologyBuilderTest {
StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false);
builder.addStateStore(supplier);
- suppliers = builder.build("X", null).stateStoreSuppliers();
+ suppliers = builder.build(null).stateStoreSuppliers();
assertEquals(0, suppliers.size());
builder.addSource("source-1", "topic-1");
builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1");
builder.connectProcessorAndStateStores("processor-1", "store-1");
- suppliers = builder.build("X", null).stateStoreSuppliers();
+ suppliers = builder.build(null).stateStoreSuppliers();
assertEquals(1, suppliers.size());
assertEquals(supplier.name(), suppliers.get(0).name());
}
@Test
public void testTopicGroups() {
- final TopologyBuilder builder = new TopologyBuilder();
+ final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X");
+ builder.addInternalTopic("topic-1x");
builder.addSource("source-1", "topic-1", "topic-1x");
builder.addSource("source-2", "topic-2");
builder.addSource("source-3", "topic-3");
@@ -218,10 +219,10 @@ public class TopologyBuilderTest {
builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
- Map<Integer, TopicsInfo> topicGroups = builder.topicGroups("X");
+ Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
- expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet()));
+ expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet()));
expectedTopicGroups.put(1, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String>emptySet(), Collections.<String>emptySet()));
expectedTopicGroups.put(2, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String>emptySet(), Collections.<String>emptySet()));
@@ -230,7 +231,7 @@ public class TopologyBuilderTest {
Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
- assertEquals(mkSet(mkSet("topic-1", "topic-1x", "topic-2")), new HashSet<>(copartitionGroups));
+ assertEquals(mkSet(mkSet("topic-1", "X-topic-1x", "topic-2")), new HashSet<>(copartitionGroups));
}
@Test
@@ -256,7 +257,7 @@ public class TopologyBuilderTest {
builder.addStateStore(supplier);
builder.connectProcessorAndStateStores("processor-5", "store-3");
- Map<Integer, TopicsInfo> topicGroups = builder.topicGroups("X");
+ Map<Integer, TopicsInfo> topicGroups = builder.setApplicationId("X").topicGroups();
Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-1"))));
@@ -281,9 +282,9 @@ public class TopologyBuilderTest {
builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1");
builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
- ProcessorTopology topology0 = builder.build("X", 0);
- ProcessorTopology topology1 = builder.build("X", 1);
- ProcessorTopology topology2 = builder.build("X", 2);
+ ProcessorTopology topology0 = builder.build(0);
+ ProcessorTopology topology1 = builder.build(1);
+ ProcessorTopology topology2 = builder.build(2);
assertEquals(mkSet("source-1", "source-2", "processor-1", "processor-2"), nodeNames(topology0.processors()));
assertEquals(mkSet("source-3", "source-4", "processor-3"), nodeNames(topology1.processors()));
http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 62b283a..382e853 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -96,7 +96,7 @@ public class ProcessorTopologyTest {
builder.addSink("sink-1", "topic-3", "processor-1");
builder.addSink("sink-2", "topic-4", "processor-1", "processor-2");
- final ProcessorTopology topology = builder.build("X", null);
+ final ProcessorTopology topology = builder.build(null);
assertEquals(6, topology.processors().size());
http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 17bda54..f743631 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -269,8 +269,9 @@ public class StreamPartitionAssignorTest {
@Test
public void testAssignWithStates() throws Exception {
StreamsConfig config = new StreamsConfig(configProps());
-
+ String applicationId = "test";
TopologyBuilder builder = new TopologyBuilder();
+ builder.setApplicationId(applicationId);
builder.addSource("source1", "topic1");
builder.addSource("source2", "topic2");
@@ -295,10 +296,10 @@ public class StreamPartitionAssignorTest {
UUID uuid2 = UUID.randomUUID();
String client1 = "client1";
- StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime());
+ StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client1, uuid1, new Metrics(), new SystemTime());
StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
- partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+ partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
subscriptions.put("consumer10",
@@ -474,6 +475,7 @@ public class StreamPartitionAssignorTest {
@Test
public void testAssignWithInternalTopics() throws Exception {
StreamsConfig config = new StreamsConfig(configProps());
+ String applicationId = "test";
TopologyBuilder builder = new TopologyBuilder();
builder.addInternalTopic("topicX");
@@ -489,10 +491,10 @@ public class StreamPartitionAssignorTest {
String client1 = "client1";
MockClientSupplier clientSupplier = new MockClientSupplier();
- StreamThread thread10 = new StreamThread(builder, config, clientSupplier, "test", client1, uuid1, new Metrics(), new SystemTime());
+ StreamThread thread10 = new StreamThread(builder.setApplicationId(applicationId), config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime());
StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
- partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+ partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(clientSupplier.restoreConsumer);
partitionAssignor.setInternalTopicManager(internalTopicManager);
http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 4ae31e4..b6a6bff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -160,7 +160,7 @@ public class StreamThreadTest {
StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId, processId, new Metrics(), new SystemTime()) {
@Override
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
- ProcessorTopology topology = builder.build("X", id.topicGroupId);
+ ProcessorTopology topology = builder.build(id.topicGroupId);
return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, restoreConsumer, config);
}
};
@@ -284,7 +284,7 @@ public class StreamThreadTest {
@Override
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
- ProcessorTopology topology = builder.build("X", id.topicGroupId);
+ ProcessorTopology topology = builder.build(id.topicGroupId);
return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, restoreConsumer, config);
}
};
@@ -403,7 +403,7 @@ public class StreamThreadTest {
@Override
protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
- ProcessorTopology topology = builder.build("X", id.topicGroupId);
+ ProcessorTopology topology = builder.build(id.topicGroupId);
return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, restoreConsumer, config);
}
};
http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 7316804..dfa7f5d 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -57,7 +57,7 @@ public class KStreamTestDriver {
File stateDir,
Serde<?> keySerde,
Serde<?> valSerde) {
- this.topology = builder.build("X", null);
+ this.topology = builder.setApplicationId("KStreamTestDriver").build(null);
this.stateDir = stateDir;
this.context = new MockProcessorContext(this, stateDir, keySerde, valSerde, new MockRecordCollector());
this.context.setTime(0L);
http://git-wip-us.apache.org/repos/asf/kafka/blob/801a7061/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 4ddbc2a..5188f34 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -147,7 +147,7 @@ public class ProcessorTopologyTestDriver {
*/
public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder, String... storeNames) {
id = new TaskId(0, 0);
- topology = builder.build("X", null);
+ topology = builder.build(null);
// Set up the consumer and producer ...
consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);