You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/02/26 03:06:10 UTC

[kafka] branch trunk updated: KAFKA-13281: add API to expose current NamedTopology set (#11808)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 29317e6  KAFKA-13281: add API to expose current NamedTopology set (#11808)
29317e6 is described below

commit 29317e695394617918fe30007a136d0710800885
Author: Walker Carlson <18...@users.noreply.github.com>
AuthorDate: Fri Feb 25 21:04:07 2022 -0600

    KAFKA-13281: add API to expose current NamedTopology set (#11808)
    
    List all the named topologies that have been added to this client
    
    Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
 .../kafka/streams/processor/internals/TopologyMetadata.java      | 9 +++++++++
 .../namedtopology/KafkaStreamsNamedTopologyWrapper.java          | 4 ++++
 2 files changed, 13 insertions(+)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
index dbcacf7..15e73ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
 import org.apache.kafka.streams.processor.internals.namedtopology.TopologyConfig.TaskConfig;
 
 import java.util.ArrayList;
@@ -52,6 +53,7 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -551,6 +553,13 @@ public class TopologyMetadata {
         }
     }
 
+    public Collection<NamedTopology> getAllNamedTopologies() {
+        return builders.values()
+            .stream()
+            .map(InternalTopologyBuilder::namedTopology)
+            .collect(Collectors.toSet());
+    }
+
 
     /**
      * @return the InternalTopologyBuilder for the NamedTopology with the given {@code topologyName}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
index 7d174b9..a0ccedb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
@@ -149,6 +149,10 @@ public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams {
         return Optional.ofNullable(topologyMetadata.lookupBuilderForNamedTopology(name)).map(InternalTopologyBuilder::namedTopology);
     }
 
+    public Collection<NamedTopology> getAllTopologies() {
+        return topologyMetadata.getAllNamedTopologies();
+    }
+
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
      * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for