You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/01/25 05:06:14 UTC
[kafka] branch trunk updated: KAFKA-13590:rename InternalTopologyBuilder#topicGroups (#11704)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d13d09f KAFKA-13590:rename InternalTopologyBuilder#topicGroups (#11704)
d13d09f is described below
commit d13d09fb682356a3013ca531c769e92a664657db
Author: Sayantanu Dey <41...@users.noreply.github.com>
AuthorDate: Tue Jan 25 10:33:37 2022 +0530
KAFKA-13590:rename InternalTopologyBuilder#topicGroups (#11704)
Renamed the often confusing and opaque #topicGroups API to #subtopologyToTopicsInfo
Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
.../processor/internals/InternalTopologyBuilder.java | 2 +-
.../processor/internals/StreamsPartitionAssignor.java | 2 +-
.../streams/processor/internals/TopologyMetadata.java | 6 +++---
.../org/apache/kafka/streams/StreamsBuilderTest.java | 6 +++---
.../kstream/internals/KStreamKStreamJoinTest.java | 4 ++--
.../internals/InternalTopologyBuilderTest.java | 18 +++++++++---------
.../processor/internals/RepartitionTopicsTest.java | 14 +++++++-------
.../internals/StreamsPartitionAssignorTest.java | 6 +++---
8 files changed, 29 insertions(+), 29 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 0911db0..1c6c430 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -1108,7 +1108,7 @@ public class InternalTopologyBuilder {
*
* @return groups of topic names
*/
- public synchronized Map<Subtopology, TopicsInfo> topicGroups() {
+ public synchronized Map<Subtopology, TopicsInfo> subtopologyToTopicsInfo() {
final Map<Subtopology, TopicsInfo> topicGroups = new LinkedHashMap<>();
if (nodeGroups == null) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 4c8b146..bcfa94b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -392,7 +392,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
// construct the assignment of tasks to clients
final Map<Subtopology, TopicsInfo> topicGroups =
- taskManager.topologyMetadata().topicGroups(missingUserInputTopicsPerTopology.keySet());
+ taskManager.topologyMetadata().subtopologyTopicsInfoMapExcluding(missingUserInputTopicsPerTopology.keySet());
final Set<String> allSourceTopics = new HashSet<>();
final Map<Subtopology, Set<String>> sourceTopicsByGroup = new HashMap<>();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
index fb161ca..8faa1e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
@@ -444,11 +444,11 @@ public class TopologyMetadata {
* @param topologiesToExclude the names of any topologies to exclude from the returned topic groups,
* eg because they have missing source topics and can't be processed yet
*/
- public Map<Subtopology, TopicsInfo> topicGroups(final Set<String> topologiesToExclude) {
+ public Map<Subtopology, TopicsInfo> subtopologyTopicsInfoMapExcluding(final Set<String> topologiesToExclude) {
final Map<Subtopology, TopicsInfo> topicGroups = new HashMap<>();
for (final InternalTopologyBuilder builder : builders.values()) {
if (!topologiesToExclude.contains(builder.topologyName())) {
- topicGroups.putAll(builder.topicGroups());
+ topicGroups.putAll(builder.subtopologyToTopicsInfo());
}
}
return topicGroups;
@@ -461,7 +461,7 @@ public class TopologyMetadata {
public Map<String, Collection<TopicsInfo>> topicGroupsByTopology() {
final Map<String, Collection<TopicsInfo>> topicGroups = new HashMap<>();
applyToEachBuilder(
- b -> topicGroups.put(getTopologyNameOrElseUnnamed(b.topologyName()), b.topicGroups().values())
+ b -> topicGroups.put(getTopologyNameOrElseUnnamed(b.topologyName()), b.subtopologyToTopicsInfo().values())
);
return topicGroups;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 569b4f6..0113e56 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -463,7 +463,7 @@ public class StreamsBuilderTest {
internalTopologyBuilder.stateStores().get("store").loggingEnabled(),
equalTo(false));
assertThat(
- internalTopologyBuilder.topicGroups().get(SUBTOPOLOGY_0).nonSourceChangelogTopics().isEmpty(),
+ internalTopologyBuilder.subtopologyToTopicsInfo().get(SUBTOPOLOGY_0).nonSourceChangelogTopics().isEmpty(),
equalTo(true));
}
@@ -491,7 +491,7 @@ public class StreamsBuilderTest {
equalTo(true)
);
assertThat(
- internalTopologyBuilder.topicGroups().get(SUBTOPOLOGY_1).stateChangelogTopics.keySet(),
+ internalTopologyBuilder.subtopologyToTopicsInfo().get(SUBTOPOLOGY_1).stateChangelogTopics.keySet(),
equalTo(Collections.singleton("appId-store-changelog"))
);
}
@@ -514,7 +514,7 @@ public class StreamsBuilderTest {
internalTopologyBuilder.stateStores().get("store").loggingEnabled(),
equalTo(true));
assertThat(
- internalTopologyBuilder.topicGroups().get(SUBTOPOLOGY_0).stateChangelogTopics.keySet(),
+ internalTopologyBuilder.subtopologyToTopicsInfo().get(SUBTOPOLOGY_0).stateChangelogTopics.keySet(),
equalTo(Collections.singleton("appId-store-changelog")));
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 08b0e93..37c9e49 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -191,8 +191,8 @@ public class KStreamKStreamJoinTest {
assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(), equalTo(true));
assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(), equalTo(true));
- assertThat(internalTopologyBuilder.topicGroups().get(SUBTOPOLOGY_0).stateChangelogTopics.size(), equalTo(2));
- for (final InternalTopicConfig config : internalTopologyBuilder.topicGroups().get(SUBTOPOLOGY_0).stateChangelogTopics.values()) {
+ assertThat(internalTopologyBuilder.subtopologyToTopicsInfo().get(SUBTOPOLOGY_0).stateChangelogTopics.size(), equalTo(2));
+ for (final InternalTopicConfig config : internalTopologyBuilder.subtopologyToTopicsInfo().get(SUBTOPOLOGY_0).stateChangelogTopics.values()) {
assertThat(
config.getProperties(Collections.emptyMap(), 0).get("test"),
equalTo("property")
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 8d4cb46..31d69e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -589,7 +589,7 @@ public class InternalTopologyBuilderTest {
builder.addProcessor("processor-3", new MockApiProcessorSupplier<>(), "source-3", "source-4");
- final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+ final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo();
final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
expectedTopicGroups.put(SUBTOPOLOGY_0, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.emptyMap(), Collections.emptyMap()));
@@ -626,7 +626,7 @@ public class InternalTopologyBuilderTest {
builder.connectProcessorAndStateStores("processor-5", "store-3");
builder.buildTopology();
- final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+ final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo();
final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
final String store1 = ProcessorStateManager.storeChangelogTopic("X", "store-1", builder.topologyName());
@@ -856,7 +856,7 @@ public class InternalTopologyBuilderTest {
"processor"
);
builder.buildTopology();
- final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+ final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo();
final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
final InternalTopicConfig topicConfig1 = topicsInfo.stateChangelogTopics.get("appId-store1-changelog");
final Map<String, String> properties1 = topicConfig1.getProperties(Collections.emptyMap(), 10000);
@@ -881,7 +881,7 @@ public class InternalTopologyBuilderTest {
builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source");
builder.addStateStore(storeBuilder, "processor");
builder.buildTopology();
- final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+ final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo();
final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-testStore-changelog");
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000);
@@ -897,7 +897,7 @@ public class InternalTopologyBuilderTest {
builder.addInternalTopic("foo", InternalTopicProperties.empty());
builder.addSource(null, "source", null, null, null, "foo");
builder.buildTopology();
- final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
+ final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.subtopologyToTopicsInfo().values().iterator().next();
final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000);
assertEquals(4, properties.size());
@@ -922,7 +922,7 @@ public class InternalTopologyBuilderTest {
builder.addSubscribedTopicsFromMetadata(updatedTopics, null);
builder.setApplicationId("test-id");
- final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+ final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo();
assertTrue(topicGroups.get(SUBTOPOLOGY_0).sourceTopics.contains("topic-foo"));
assertTrue(topicGroups.get(SUBTOPOLOGY_1).sourceTopics.contains("topic-A"));
assertTrue(topicGroups.get(SUBTOPOLOGY_1).sourceTopics.contains("topic-B"));
@@ -1166,7 +1166,7 @@ public class InternalTopologyBuilderTest {
builder.addInternalTopic("topic-1z", new InternalTopicProperties(numberOfPartitions));
builder.addSource(null, "source-1", null, null, null, "topic-1z");
- final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+ final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo();
final Map<String, InternalTopicConfig> repartitionSourceTopics = topicGroups.get(SUBTOPOLOGY_0).repartitionSourceTopics;
@@ -1187,7 +1187,7 @@ public class InternalTopologyBuilderTest {
builder.addInternalTopic("topic-1t", InternalTopicProperties.empty());
builder.addSource(null, "source-1", null, null, null, "topic-1t");
- final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+ final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo();
final Map<String, InternalTopicConfig> repartitionSourceTopics = topicGroups.get(SUBTOPOLOGY_0).repartitionSourceTopics;
@@ -1206,7 +1206,7 @@ public class InternalTopologyBuilderTest {
builder.addInternalTopic("topic-1y", InternalTopicProperties.empty());
builder.addSource(null, "source-1", null, null, null, "topic-1y");
- final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+ final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo();
final Map<String, InternalTopicConfig> repartitionSourceTopics = topicGroups.get(SUBTOPOLOGY_0).repartitionSourceTopics;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
index 57bf60e..bacab10 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
@@ -110,7 +110,7 @@ public class RepartitionTopicsTest {
@Test
public void shouldSetupRepartitionTopics() {
- expect(internalTopologyBuilder.topicGroups())
+ expect(internalTopologyBuilder.subtopologyToTopicsInfo())
.andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1), mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2)));
final Set<String> coPartitionGroup1 = mkSet(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2);
final Set<String> coPartitionGroup2 = mkSet(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_NAME2);
@@ -150,7 +150,7 @@ public class RepartitionTopicsTest {
@Test
public void shouldReturnMissingSourceTopics() {
final Set<String> missingSourceTopics = mkSet(SOURCE_TOPIC_NAME1);
- expect(internalTopologyBuilder.topicGroups())
+ expect(internalTopologyBuilder.subtopologyToTopicsInfo())
.andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1), mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2)));
expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList());
copartitionedTopicsEnforcer.enforce(eq(Collections.emptySet()), anyObject(), eq(clusterMetadata));
@@ -180,7 +180,7 @@ public class RepartitionTopicsTest {
public void shouldThrowTaskAssignmentExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopics() {
final RepartitionTopicConfig repartitionTopicConfigWithoutPartitionCount =
new RepartitionTopicConfig(REPARTITION_WITHOUT_PARTITION_COUNT, TOPIC_CONFIG5);
- expect(internalTopologyBuilder.topicGroups())
+ expect(internalTopologyBuilder.subtopologyToTopicsInfo())
.andReturn(mkMap(
mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1),
mkEntry(SUBTOPOLOGY_1, setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount))
@@ -218,7 +218,7 @@ public class RepartitionTopicsTest {
),
Collections.emptyMap()
);
- expect(internalTopologyBuilder.topicGroups())
+ expect(internalTopologyBuilder.subtopologyToTopicsInfo())
.andReturn(mkMap(
mkEntry(SUBTOPOLOGY_0, topicsInfo),
mkEntry(SUBTOPOLOGY_1, setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount))
@@ -261,7 +261,7 @@ public class RepartitionTopicsTest {
),
Collections.emptyMap()
);
- expect(internalTopologyBuilder.topicGroups())
+ expect(internalTopologyBuilder.subtopologyToTopicsInfo())
.andReturn(mkMap(
mkEntry(SUBTOPOLOGY_0, topicsInfo),
mkEntry(SUBTOPOLOGY_1, setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount))
@@ -315,7 +315,7 @@ public class RepartitionTopicsTest {
),
Collections.emptyMap()
);
- expect(internalTopologyBuilder.topicGroups())
+ expect(internalTopologyBuilder.subtopologyToTopicsInfo())
.andReturn(mkMap(
mkEntry(SUBTOPOLOGY_0, topicsInfo),
mkEntry(SUBTOPOLOGY_1, setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount))
@@ -364,7 +364,7 @@ public class RepartitionTopicsTest {
Collections.emptyMap(),
Collections.emptyMap()
);
- expect(internalTopologyBuilder.topicGroups())
+ expect(internalTopologyBuilder.subtopologyToTopicsInfo())
.andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, topicsInfo)));
setupCluster();
replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 971b688..2bdc638 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -816,7 +816,7 @@ public class StreamsPartitionAssignorTest {
assertEquals(new HashSet<>(tasks), allTasks);
// check tasks for state topics
- final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+ final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo();
assertEquals(mkSet(TASK_0_0, TASK_0_1, TASK_0_2), tasksForState("store1", tasks, topicGroups));
assertEquals(mkSet(TASK_1_0, TASK_1_1, TASK_1_2), tasksForState("store2", tasks, topicGroups));
@@ -2040,10 +2040,10 @@ public class StreamsPartitionAssignorTest {
private Map<Subtopology, TopicsInfo> corruptedTopicGroups;
@Override
- public synchronized Map<Subtopology, TopicsInfo> topicGroups() {
+ public synchronized Map<Subtopology, TopicsInfo> subtopologyToTopicsInfo() {
if (corruptedTopicGroups == null) {
corruptedTopicGroups = new HashMap<>();
- for (final Map.Entry<Subtopology, TopicsInfo> topicGroupEntry : super.topicGroups().entrySet()) {
+ for (final Map.Entry<Subtopology, TopicsInfo> topicGroupEntry : super.subtopologyToTopicsInfo().entrySet()) {
final TopicsInfo originalInfo = topicGroupEntry.getValue();
corruptedTopicGroups.put(
topicGroupEntry.getKey(),