You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "bbejeck (via GitHub)" <gi...@apache.org> on 2023/05/25 15:36:12 UTC

[GitHub] [kafka] bbejeck commented on a diff in pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map

bbejeck commented on code in PR #13751:
URL: https://github.com/apache/kafka/pull/13751#discussion_r1205696101


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java:
##########
@@ -558,7 +564,7 @@ private SourceTopicsInfo getSourceTopicsInfo(final String storeName, final Strin
     }
 
     private boolean isInitialized() {
-        return clusterMetadata != null && !clusterMetadata.topics().isEmpty() && localMetadata.get() != null;
+        return partitionsByTopic != null && !partitionsByTopic.keySet().isEmpty() && localMetadata.get() != null;

Review Comment:
   Could be simplified to `partitionsByTopic.isEmpty()`



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java:
##########
@@ -308,12 +308,18 @@ public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String
      *
      * @param activePartitionHostMap  the current mapping of {@link HostInfo} -> {@link TopicPartition}s for active partitions
      * @param standbyPartitionHostMap the current mapping of {@link HostInfo} -> {@link TopicPartition}s for standby partitions
-     * @param clusterMetadata         the current clusterMetadata {@link Cluster}
+     * @param topicPartitionInfo      the current mapping of {@link TopicPartition} -> {@Link PartitionInfo}
      */
     synchronized void onChange(final Map<HostInfo, Set<TopicPartition>> activePartitionHostMap,
                                final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap,
-                               final Cluster clusterMetadata) {
-        this.clusterMetadata = clusterMetadata;
+                               final Map<TopicPartition, PartitionInfo> topicPartitionInfo) {
+        this.partitionsByTopic = new HashMap<>();
+        for (final Map.Entry<TopicPartition, PartitionInfo> entry: topicPartitionInfo.entrySet()) {

Review Comment:
   nit: use `topicPartitionInfo.entrySet().foreach( entry -> {...} )` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org