You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "danicafine (via GitHub)" <gi...@apache.org> on 2023/05/23 23:30:23 UTC

[GitHub] [kafka] danicafine opened a new pull request, #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map

danicafine opened a new pull request, #13751:
URL: https://github.com/apache/kafka/pull/13751

   Replace usage of Cluster in StreamsMetadataState with Map<String, List<PartitionInfo>>. Update StreamsPartitionAssignor#onAssignment method to pass existing Map<TopicPartition, PartitionInfo> instead of fake Cluster object.
   
   Behavior remains the same; updated existing unit tests accordingly.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] bbejeck commented on pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map

Posted by "bbejeck (via GitHub)" <gi...@apache.org>.
bbejeck commented on PR #13751:
URL: https://github.com/apache/kafka/pull/13751#issuecomment-1563454591

   Test failures are unrelated


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] bbejeck commented on pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map

Posted by "bbejeck (via GitHub)" <gi...@apache.org>.
bbejeck commented on PR #13751:
URL: https://github.com/apache/kafka/pull/13751#issuecomment-1581399670

   merged #13751 into trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] bbejeck merged pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map

Posted by "bbejeck (via GitHub)" <gi...@apache.org>.
bbejeck merged PR #13751:
URL: https://github.com/apache/kafka/pull/13751


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] bbejeck commented on a diff in pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map

Posted by "bbejeck (via GitHub)" <gi...@apache.org>.
bbejeck commented on code in PR #13751:
URL: https://github.com/apache/kafka/pull/13751#discussion_r1205696101


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java:
##########
@@ -558,7 +564,7 @@ private SourceTopicsInfo getSourceTopicsInfo(final String storeName, final Strin
     }
 
     private boolean isInitialized() {
-        return clusterMetadata != null && !clusterMetadata.topics().isEmpty() && localMetadata.get() != null;
+        return partitionsByTopic != null && !partitionsByTopic.keySet().isEmpty() && localMetadata.get() != null;

Review Comment:
   Could be simplified to `partitionsByTopic.isEmpty()`



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java:
##########
@@ -308,12 +308,18 @@ public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String
      *
      * @param activePartitionHostMap  the current mapping of {@link HostInfo} -> {@link TopicPartition}s for active partitions
      * @param standbyPartitionHostMap the current mapping of {@link HostInfo} -> {@link TopicPartition}s for standby partitions
-     * @param clusterMetadata         the current clusterMetadata {@link Cluster}
+     * @param topicPartitionInfo      the current mapping of {@link TopicPartition} -> {@Link PartitionInfo}
      */
     synchronized void onChange(final Map<HostInfo, Set<TopicPartition>> activePartitionHostMap,
                                final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap,
-                               final Cluster clusterMetadata) {
-        this.clusterMetadata = clusterMetadata;
+                               final Map<TopicPartition, PartitionInfo> topicPartitionInfo) {
+        this.partitionsByTopic = new HashMap<>();
+        for (final Map.Entry<TopicPartition, PartitionInfo> entry: topicPartitionInfo.entrySet()) {

Review Comment:
   nit: use `topicPartitionInfo.entrySet().foreach( entry -> {...} )` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map

Posted by "wcarlson5 (via GitHub)" <gi...@apache.org>.
wcarlson5 commented on code in PR #13751:
URL: https://github.com/apache/kafka/pull/13751#discussion_r1221950345


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java:
##########
@@ -308,12 +308,17 @@ public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String
      *
      * @param activePartitionHostMap  the current mapping of {@link HostInfo} -> {@link TopicPartition}s for active partitions
      * @param standbyPartitionHostMap the current mapping of {@link HostInfo} -> {@link TopicPartition}s for standby partitions
-     * @param clusterMetadata         the current clusterMetadata {@link Cluster}
+     * @param topicPartitionInfo      the current mapping of {@link TopicPartition} -> {@Link PartitionInfo}
      */
     synchronized void onChange(final Map<HostInfo, Set<TopicPartition>> activePartitionHostMap,
                                final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap,
-                               final Cluster clusterMetadata) {
-        this.clusterMetadata = clusterMetadata;
+                               final Map<TopicPartition, PartitionInfo> topicPartitionInfo) {
+        this.partitionsByTopic = new HashMap<>();
+        topicPartitionInfo.entrySet().forEach(entry -> this.partitionsByTopic

Review Comment:
   Can we format this so it is a bit easier to read? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] bbejeck commented on pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map

Posted by "bbejeck (via GitHub)" <gi...@apache.org>.
bbejeck commented on PR #13751:
URL: https://github.com/apache/kafka/pull/13751#issuecomment-1563134216

   ping @mjsax or @ableegoldman for a second review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] bbejeck commented on pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map

Posted by "bbejeck (via GitHub)" <gi...@apache.org>.
bbejeck commented on PR #13751:
URL: https://github.com/apache/kafka/pull/13751#issuecomment-1570984634

   Failures are unrelated to this PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] bbejeck commented on pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map

Posted by "bbejeck (via GitHub)" <gi...@apache.org>.
bbejeck commented on PR #13751:
URL: https://github.com/apache/kafka/pull/13751#issuecomment-1570985258

   ping @ableegoldman for 2nd review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org