You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/01/25 05:06:14 UTC

[kafka] branch trunk updated: KAFKA-13590:rename InternalTopologyBuilder#topicGroups (#11704)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d13d09f  KAFKA-13590:rename InternalTopologyBuilder#topicGroups (#11704)
d13d09f is described below

commit d13d09fb682356a3013ca531c769e92a664657db
Author: Sayantanu Dey <41...@users.noreply.github.com>
AuthorDate: Tue Jan 25 10:33:37 2022 +0530

    KAFKA-13590:rename InternalTopologyBuilder#topicGroups (#11704)
    
    Renamed the often confusing and opaque #topicGroups API to #subtopologyToTopicsInfo
    
    Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
 .../processor/internals/InternalTopologyBuilder.java   |  2 +-
 .../processor/internals/StreamsPartitionAssignor.java  |  2 +-
 .../streams/processor/internals/TopologyMetadata.java  |  6 +++---
 .../org/apache/kafka/streams/StreamsBuilderTest.java   |  6 +++---
 .../kstream/internals/KStreamKStreamJoinTest.java      |  4 ++--
 .../internals/InternalTopologyBuilderTest.java         | 18 +++++++++---------
 .../processor/internals/RepartitionTopicsTest.java     | 14 +++++++-------
 .../internals/StreamsPartitionAssignorTest.java        |  6 +++---
 8 files changed, 29 insertions(+), 29 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 0911db0..1c6c430 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -1108,7 +1108,7 @@ public class InternalTopologyBuilder {
      *
      * @return groups of topic names
      */
-    public synchronized Map<Subtopology, TopicsInfo> topicGroups() {
+    public synchronized Map<Subtopology, TopicsInfo> subtopologyToTopicsInfo() {
         final Map<Subtopology, TopicsInfo> topicGroups = new LinkedHashMap<>();
 
         if (nodeGroups == null) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 4c8b146..bcfa94b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -392,7 +392,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
             // construct the assignment of tasks to clients
 
             final Map<Subtopology, TopicsInfo> topicGroups =
-                taskManager.topologyMetadata().topicGroups(missingUserInputTopicsPerTopology.keySet());
+                taskManager.topologyMetadata().subtopologyTopicsInfoMapExcluding(missingUserInputTopicsPerTopology.keySet());
 
             final Set<String> allSourceTopics = new HashSet<>();
             final Map<Subtopology, Set<String>> sourceTopicsByGroup = new HashMap<>();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
index fb161ca..8faa1e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
@@ -444,11 +444,11 @@ public class TopologyMetadata {
      * @param topologiesToExclude the names of any topologies to exclude from the returned topic groups,
      *                            eg because they have missing source topics and can't be processed yet
      */
-    public Map<Subtopology, TopicsInfo> topicGroups(final Set<String> topologiesToExclude) {
+    public Map<Subtopology, TopicsInfo> subtopologyTopicsInfoMapExcluding(final Set<String> topologiesToExclude) {
         final Map<Subtopology, TopicsInfo> topicGroups = new HashMap<>();
         for (final InternalTopologyBuilder builder : builders.values()) {
             if (!topologiesToExclude.contains(builder.topologyName())) {
-                topicGroups.putAll(builder.topicGroups());
+                topicGroups.putAll(builder.subtopologyToTopicsInfo());
             }
         }
         return topicGroups;
@@ -461,7 +461,7 @@ public class TopologyMetadata {
     public Map<String, Collection<TopicsInfo>> topicGroupsByTopology() {
         final Map<String, Collection<TopicsInfo>> topicGroups = new HashMap<>();
         applyToEachBuilder(
-            b -> topicGroups.put(getTopologyNameOrElseUnnamed(b.topologyName()), b.topicGroups().values())
+            b -> topicGroups.put(getTopologyNameOrElseUnnamed(b.topologyName()), b.subtopologyToTopicsInfo().values())
         );
         return topicGroups;
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 569b4f6..0113e56 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -463,7 +463,7 @@ public class StreamsBuilderTest {
             internalTopologyBuilder.stateStores().get("store").loggingEnabled(),
             equalTo(false));
         assertThat(
-            internalTopologyBuilder.topicGroups().get(SUBTOPOLOGY_0).nonSourceChangelogTopics().isEmpty(),
+            internalTopologyBuilder.subtopologyToTopicsInfo().get(SUBTOPOLOGY_0).nonSourceChangelogTopics().isEmpty(),
             equalTo(true));
     }
 
@@ -491,7 +491,7 @@ public class StreamsBuilderTest {
             equalTo(true)
         );
         assertThat(
-            internalTopologyBuilder.topicGroups().get(SUBTOPOLOGY_1).stateChangelogTopics.keySet(),
+            internalTopologyBuilder.subtopologyToTopicsInfo().get(SUBTOPOLOGY_1).stateChangelogTopics.keySet(),
             equalTo(Collections.singleton("appId-store-changelog"))
         );
     }
@@ -514,7 +514,7 @@ public class StreamsBuilderTest {
             internalTopologyBuilder.stateStores().get("store").loggingEnabled(),
             equalTo(true));
         assertThat(
-            internalTopologyBuilder.topicGroups().get(SUBTOPOLOGY_0).stateChangelogTopics.keySet(),
+            internalTopologyBuilder.subtopologyToTopicsInfo().get(SUBTOPOLOGY_0).stateChangelogTopics.keySet(),
             equalTo(Collections.singleton("appId-store-changelog")));
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index 08b0e93..37c9e49 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -191,8 +191,8 @@ public class KStreamKStreamJoinTest {
 
         assertThat(internalTopologyBuilder.stateStores().get("store-this-join-store").loggingEnabled(), equalTo(true));
         assertThat(internalTopologyBuilder.stateStores().get("store-other-join-store").loggingEnabled(), equalTo(true));
-        assertThat(internalTopologyBuilder.topicGroups().get(SUBTOPOLOGY_0).stateChangelogTopics.size(), equalTo(2));
-        for (final InternalTopicConfig config : internalTopologyBuilder.topicGroups().get(SUBTOPOLOGY_0).stateChangelogTopics.values()) {
+        assertThat(internalTopologyBuilder.subtopologyToTopicsInfo().get(SUBTOPOLOGY_0).stateChangelogTopics.size(), equalTo(2));
+        for (final InternalTopicConfig config : internalTopologyBuilder.subtopologyToTopicsInfo().get(SUBTOPOLOGY_0).stateChangelogTopics.values()) {
             assertThat(
                 config.getProperties(Collections.emptyMap(), 0).get("test"),
                 equalTo("property")
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 8d4cb46..31d69e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -589,7 +589,7 @@ public class InternalTopologyBuilderTest {
 
         builder.addProcessor("processor-3", new MockApiProcessorSupplier<>(), "source-3", "source-4");
 
-        final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo();
 
         final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
         expectedTopicGroups.put(SUBTOPOLOGY_0, new InternalTopologyBuilder.TopicsInfo(Collections.emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.emptyMap(), Collections.emptyMap()));
@@ -626,7 +626,7 @@ public class InternalTopologyBuilderTest {
         builder.connectProcessorAndStateStores("processor-5", "store-3");
         builder.buildTopology();
 
-        final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo();
 
         final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>();
         final String store1 = ProcessorStateManager.storeChangelogTopic("X", "store-1", builder.topologyName());
@@ -856,7 +856,7 @@ public class InternalTopologyBuilderTest {
                 "processor"
         );
         builder.buildTopology();
-        final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo();
         final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
         final InternalTopicConfig topicConfig1 = topicsInfo.stateChangelogTopics.get("appId-store1-changelog");
         final Map<String, String> properties1 = topicConfig1.getProperties(Collections.emptyMap(), 10000);
@@ -881,7 +881,7 @@ public class InternalTopologyBuilderTest {
         builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source");
         builder.addStateStore(storeBuilder, "processor");
         builder.buildTopology();
-        final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo();
         final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
         final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-testStore-changelog");
         final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000);
@@ -897,7 +897,7 @@ public class InternalTopologyBuilderTest {
         builder.addInternalTopic("foo", InternalTopicProperties.empty());
         builder.addSource(null, "source", null, null, null, "foo");
         builder.buildTopology();
-        final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next();
+        final InternalTopologyBuilder.TopicsInfo topicsInfo = builder.subtopologyToTopicsInfo().values().iterator().next();
         final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo");
         final Map<String, String> properties = topicConfig.getProperties(Collections.emptyMap(), 10000);
         assertEquals(4, properties.size());
@@ -922,7 +922,7 @@ public class InternalTopologyBuilderTest {
         builder.addSubscribedTopicsFromMetadata(updatedTopics, null);
         builder.setApplicationId("test-id");
 
-        final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo();
         assertTrue(topicGroups.get(SUBTOPOLOGY_0).sourceTopics.contains("topic-foo"));
         assertTrue(topicGroups.get(SUBTOPOLOGY_1).sourceTopics.contains("topic-A"));
         assertTrue(topicGroups.get(SUBTOPOLOGY_1).sourceTopics.contains("topic-B"));
@@ -1166,7 +1166,7 @@ public class InternalTopologyBuilderTest {
         builder.addInternalTopic("topic-1z", new InternalTopicProperties(numberOfPartitions));
         builder.addSource(null, "source-1", null, null, null, "topic-1z");
 
-        final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo();
 
         final Map<String, InternalTopicConfig> repartitionSourceTopics = topicGroups.get(SUBTOPOLOGY_0).repartitionSourceTopics;
 
@@ -1187,7 +1187,7 @@ public class InternalTopologyBuilderTest {
         builder.addInternalTopic("topic-1t", InternalTopicProperties.empty());
         builder.addSource(null, "source-1", null, null, null, "topic-1t");
 
-        final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo();
 
         final Map<String, InternalTopicConfig> repartitionSourceTopics = topicGroups.get(SUBTOPOLOGY_0).repartitionSourceTopics;
 
@@ -1206,7 +1206,7 @@ public class InternalTopologyBuilderTest {
         builder.addInternalTopic("topic-1y", InternalTopicProperties.empty());
         builder.addSource(null, "source-1", null, null, null, "topic-1y");
 
-        final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo();
 
         final Map<String, InternalTopicConfig> repartitionSourceTopics = topicGroups.get(SUBTOPOLOGY_0).repartitionSourceTopics;
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
index 57bf60e..bacab10 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
@@ -110,7 +110,7 @@ public class RepartitionTopicsTest {
 
     @Test
     public void shouldSetupRepartitionTopics() {
-        expect(internalTopologyBuilder.topicGroups())
+        expect(internalTopologyBuilder.subtopologyToTopicsInfo())
             .andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1), mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2)));
         final Set<String> coPartitionGroup1 = mkSet(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2);
         final Set<String> coPartitionGroup2 = mkSet(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_NAME2);
@@ -150,7 +150,7 @@ public class RepartitionTopicsTest {
     @Test
     public void shouldReturnMissingSourceTopics() {
         final Set<String> missingSourceTopics = mkSet(SOURCE_TOPIC_NAME1);
-        expect(internalTopologyBuilder.topicGroups())
+        expect(internalTopologyBuilder.subtopologyToTopicsInfo())
             .andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1), mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2)));
         expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList());
         copartitionedTopicsEnforcer.enforce(eq(Collections.emptySet()), anyObject(), eq(clusterMetadata));
@@ -180,7 +180,7 @@ public class RepartitionTopicsTest {
     public void shouldThrowTaskAssignmentExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopics() {
         final RepartitionTopicConfig repartitionTopicConfigWithoutPartitionCount =
             new RepartitionTopicConfig(REPARTITION_WITHOUT_PARTITION_COUNT, TOPIC_CONFIG5);
-        expect(internalTopologyBuilder.topicGroups())
+        expect(internalTopologyBuilder.subtopologyToTopicsInfo())
             .andReturn(mkMap(
                 mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1),
                 mkEntry(SUBTOPOLOGY_1, setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount))
@@ -218,7 +218,7 @@ public class RepartitionTopicsTest {
             ),
             Collections.emptyMap()
         );
-        expect(internalTopologyBuilder.topicGroups())
+        expect(internalTopologyBuilder.subtopologyToTopicsInfo())
             .andReturn(mkMap(
                 mkEntry(SUBTOPOLOGY_0, topicsInfo),
                 mkEntry(SUBTOPOLOGY_1, setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount))
@@ -261,7 +261,7 @@ public class RepartitionTopicsTest {
             ),
             Collections.emptyMap()
         );
-        expect(internalTopologyBuilder.topicGroups())
+        expect(internalTopologyBuilder.subtopologyToTopicsInfo())
             .andReturn(mkMap(
                 mkEntry(SUBTOPOLOGY_0, topicsInfo),
                 mkEntry(SUBTOPOLOGY_1, setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount))
@@ -315,7 +315,7 @@ public class RepartitionTopicsTest {
             ),
             Collections.emptyMap()
         );
-        expect(internalTopologyBuilder.topicGroups())
+        expect(internalTopologyBuilder.subtopologyToTopicsInfo())
             .andReturn(mkMap(
                 mkEntry(SUBTOPOLOGY_0, topicsInfo),
                 mkEntry(SUBTOPOLOGY_1, setupTopicInfoWithRepartitionTopicWithoutPartitionCount(repartitionTopicConfigWithoutPartitionCount))
@@ -364,7 +364,7 @@ public class RepartitionTopicsTest {
             Collections.emptyMap(),
             Collections.emptyMap()
         );
-        expect(internalTopologyBuilder.topicGroups())
+        expect(internalTopologyBuilder.subtopologyToTopicsInfo())
             .andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, topicsInfo)));
         setupCluster();
         replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 971b688..2bdc638 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -816,7 +816,7 @@ public class StreamsPartitionAssignorTest {
         assertEquals(new HashSet<>(tasks), allTasks);
 
         // check tasks for state topics
-        final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
+        final Map<Subtopology, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.subtopologyToTopicsInfo();
 
         assertEquals(mkSet(TASK_0_0, TASK_0_1, TASK_0_2), tasksForState("store1", tasks, topicGroups));
         assertEquals(mkSet(TASK_1_0, TASK_1_1, TASK_1_2), tasksForState("store2", tasks, topicGroups));
@@ -2040,10 +2040,10 @@ public class StreamsPartitionAssignorTest {
         private Map<Subtopology, TopicsInfo> corruptedTopicGroups;
 
         @Override
-        public synchronized Map<Subtopology, TopicsInfo> topicGroups() {
+        public synchronized Map<Subtopology, TopicsInfo> subtopologyToTopicsInfo() {
             if (corruptedTopicGroups == null) {
                 corruptedTopicGroups = new HashMap<>();
-                for (final Map.Entry<Subtopology, TopicsInfo> topicGroupEntry : super.topicGroups().entrySet()) {
+                for (final Map.Entry<Subtopology, TopicsInfo> topicGroupEntry : super.subtopologyToTopicsInfo().entrySet()) {
                     final TopicsInfo originalInfo = topicGroupEntry.getValue();
                     corruptedTopicGroups.put(
                         topicGroupEntry.getKey(),