You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "danicafine (via GitHub)" <gi...@apache.org> on 2023/05/23 23:30:23 UTC
[GitHub] [kafka] danicafine opened a new pull request, #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map
danicafine opened a new pull request, #13751:
URL: https://github.com/apache/kafka/pull/13751
Replace usage of Cluster in StreamsMetadataState with Map<String, List<PartitionInfo>>. Update StreamsPartitionAssignor#onAssignment method to pass existing Map<TopicPartition, PartitionInfo> instead of fake Cluster object.
Behavior remains the same; updated existing unit tests accordingly.
### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map
Posted by "bbejeck (via GitHub)" <gi...@apache.org>.
bbejeck commented on PR #13751:
URL: https://github.com/apache/kafka/pull/13751#issuecomment-1563454591
Test failures are unrelated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map
Posted by "bbejeck (via GitHub)" <gi...@apache.org>.
bbejeck commented on PR #13751:
URL: https://github.com/apache/kafka/pull/13751#issuecomment-1581399670
merged #13751 into trunk
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] bbejeck merged pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map
Posted by "bbejeck (via GitHub)" <gi...@apache.org>.
bbejeck merged PR #13751:
URL: https://github.com/apache/kafka/pull/13751
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] bbejeck commented on a diff in pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map
Posted by "bbejeck (via GitHub)" <gi...@apache.org>.
bbejeck commented on code in PR #13751:
URL: https://github.com/apache/kafka/pull/13751#discussion_r1205696101
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java:
##########
@@ -558,7 +564,7 @@ private SourceTopicsInfo getSourceTopicsInfo(final String storeName, final Strin
}
private boolean isInitialized() {
- return clusterMetadata != null && !clusterMetadata.topics().isEmpty() && localMetadata.get() != null;
+ return partitionsByTopic != null && !partitionsByTopic.keySet().isEmpty() && localMetadata.get() != null;
Review Comment:
Could be simplified to `partitionsByTopic.isEmpty()`
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java:
##########
@@ -308,12 +308,18 @@ public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String
*
* @param activePartitionHostMap the current mapping of {@link HostInfo} -> {@link TopicPartition}s for active partitions
* @param standbyPartitionHostMap the current mapping of {@link HostInfo} -> {@link TopicPartition}s for standby partitions
- * @param clusterMetadata the current clusterMetadata {@link Cluster}
+ * @param topicPartitionInfo the current mapping of {@link TopicPartition} -> {@Link PartitionInfo}
*/
synchronized void onChange(final Map<HostInfo, Set<TopicPartition>> activePartitionHostMap,
final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap,
- final Cluster clusterMetadata) {
- this.clusterMetadata = clusterMetadata;
+ final Map<TopicPartition, PartitionInfo> topicPartitionInfo) {
+ this.partitionsByTopic = new HashMap<>();
+ for (final Map.Entry<TopicPartition, PartitionInfo> entry: topicPartitionInfo.entrySet()) {
Review Comment:
nit: use `topicPartitionInfo.entrySet().foreach( entry -> {...} )`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map
Posted by "wcarlson5 (via GitHub)" <gi...@apache.org>.
wcarlson5 commented on code in PR #13751:
URL: https://github.com/apache/kafka/pull/13751#discussion_r1221950345
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java:
##########
@@ -308,12 +308,17 @@ public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String
*
* @param activePartitionHostMap the current mapping of {@link HostInfo} -> {@link TopicPartition}s for active partitions
* @param standbyPartitionHostMap the current mapping of {@link HostInfo} -> {@link TopicPartition}s for standby partitions
- * @param clusterMetadata the current clusterMetadata {@link Cluster}
+ * @param topicPartitionInfo the current mapping of {@link TopicPartition} -> {@Link PartitionInfo}
*/
synchronized void onChange(final Map<HostInfo, Set<TopicPartition>> activePartitionHostMap,
final Map<HostInfo, Set<TopicPartition>> standbyPartitionHostMap,
- final Cluster clusterMetadata) {
- this.clusterMetadata = clusterMetadata;
+ final Map<TopicPartition, PartitionInfo> topicPartitionInfo) {
+ this.partitionsByTopic = new HashMap<>();
+ topicPartitionInfo.entrySet().forEach(entry -> this.partitionsByTopic
Review Comment:
Can we format this so it is a bit easier to read?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map
Posted by "bbejeck (via GitHub)" <gi...@apache.org>.
bbejeck commented on PR #13751:
URL: https://github.com/apache/kafka/pull/13751#issuecomment-1563134216
ping @mjsax or @ableegoldman for a second review
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map
Posted by "bbejeck (via GitHub)" <gi...@apache.org>.
bbejeck commented on PR #13751:
URL: https://github.com/apache/kafka/pull/13751#issuecomment-1570984634
Failures are unrelated to this PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #13751: KAFKA-14539: Simplify StreamsMetadataState by replacing the Cluster metadata with partition info map
Posted by "bbejeck (via GitHub)" <gi...@apache.org>.
bbejeck commented on PR #13751:
URL: https://github.com/apache/kafka/pull/13751#issuecomment-1570985258
ping @ableegoldman for 2nd review
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org