You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "bbejeck (via GitHub)" <gi...@apache.org> on 2023/05/25 15:36:12 UTC
[GitHub] [kafka] bbejeck commented on a diff in pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map
bbejeck commented on code in PR #13751:
URL: https://github.com/apache/kafka/pull/13751#discussion_r1205696101
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java:
##########
@@ -558,7 +564,7 @@ private SourceTopicsInfo getSourceTopicsInfo(final String storeName, final Strin
}
private boolean isInitialized() {
- return clusterMetadata != null && !clusterMetadata.topics().isEmpty() && localMetadata.get() != null;
+ return partitionsByTopic != null && !partitionsByTopic.keySet().isEmpty() && localMetadata.get() != null;
Review Comment:
Could be simplified to `partitionsByTopic.isEmpty()`
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java:
##########
@@ -308,12 +308,18 @@ public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String
*
* @param activePartitionHostMap the current mapping of {@link HostInfo} -> {@link TopicPartition}s for active partitions
* @param standbyPartitionHostMap the current mapping of {@link HostInfo} -> {@link TopicPartition}s for standby partitions
- * @param clusterMetadata the current clusterMetadata {@link Cluster}
+ * @param topicPartitionInfo the current mapping of {@link TopicPartition} -> {@Link PartitionInfo}
*/
synchronized void onChange(final Map<HostInfo, Set<TopicPartition>> activePartitionHostMap,
final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap,
- final Cluster clusterMetadata) {
- this.clusterMetadata = clusterMetadata;
+ final Map<TopicPartition, PartitionInfo> topicPartitionInfo) {
+ this.partitionsByTopic = new HashMap<>();
+ for (final Map.Entry<TopicPartition, PartitionInfo> entry: topicPartitionInfo.entrySet()) {
Review Comment:
nit: use `topicPartitionInfo.entrySet().foreach( entry -> {...} )`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org