You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/02/26 03:06:10 UTC
[kafka] branch trunk updated: KAFKA-13281: add API to expose current NamedTopology set (#11808)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 29317e6 KAFKA-13281: add API to expose current NamedTopology set (#11808)
29317e6 is described below
commit 29317e695394617918fe30007a136d0710800885
Author: Walker Carlson <18...@users.noreply.github.com>
AuthorDate: Fri Feb 25 21:04:07 2022 -0600
KAFKA-13281: add API to expose current NamedTopology set (#11808)
List all the named topologies that have been added to this client
Reviewers: Anna Sophie Blee-Goldman <ab...@apache.org>
---
.../kafka/streams/processor/internals/TopologyMetadata.java | 9 +++++++++
.../namedtopology/KafkaStreamsNamedTopologyWrapper.java | 4 ++++
2 files changed, 13 insertions(+)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
index dbcacf7..15e73ae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
+import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
import org.apache.kafka.streams.processor.internals.namedtopology.TopologyConfig.TaskConfig;
import java.util.ArrayList;
@@ -52,6 +53,7 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -551,6 +553,13 @@ public class TopologyMetadata {
}
}
+ public Collection<NamedTopology> getAllNamedTopologies() {
+ return builders.values()
+ .stream()
+ .map(InternalTopologyBuilder::namedTopology)
+ .collect(Collectors.toSet());
+ }
+
/**
* @return the InternalTopologyBuilder for the NamedTopology with the given {@code topologyName}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
index 7d174b9..a0ccedb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
@@ -149,6 +149,10 @@ public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams {
return Optional.ofNullable(topologyMetadata.lookupBuilderForNamedTopology(name)).map(InternalTopologyBuilder::namedTopology);
}
+ public Collection<NamedTopology> getAllTopologies() {
+ return topologyMetadata.getAllNamedTopologies();
+ }
+
/**
* Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
* you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for