You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/09/25 19:22:39 UTC

[kafka] branch 3.0 updated: KAFKA-13296: warn if previous assignment has duplicate partitions (#11347)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new fa03171  KAFKA-13296: warn if previous assignment has duplicate partitions (#11347)
fa03171 is described below

commit fa031719754f826222fa4c80fb33d21c2c29ae06
Author: Walker Carlson <18...@users.noreply.github.com>
AuthorDate: Sat Sep 25 13:41:52 2021 -0500

    KAFKA-13296: warn if previous assignment has duplicate partitions (#11347)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Luke Chen <sh...@gmail.com>
---
 .../streams/processor/internals/StreamsPartitionAssignor.java  | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 630a431..a750749a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -309,6 +309,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
         int minSupportedMetadataVersion = LATEST_SUPPORTED_VERSION;
 
         boolean shutdownRequested = false;
+        boolean assignementErrorFound = false;
         int futureMetadataVersion = UNKNOWN;
         for (final Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
             final String consumerId = entry.getKey();
@@ -343,10 +344,19 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
 
             // add the consumer and any info in its subscription to the client
             clientMetadata.addConsumer(consumerId, subscription.ownedPartitions());
+            final int prevSize = allOwnedPartitions.size();
             allOwnedPartitions.addAll(subscription.ownedPartitions());
+            if (allOwnedPartitions.size() < prevSize + subscription.ownedPartitions().size()) {
+                assignementErrorFound = true;
+            }
             clientMetadata.addPreviousTasksAndOffsetSums(consumerId, info.taskOffsetSums());
         }
 
+        if (assignementErrorFound) {
+            log.warn("The previous assignment contains a partition more than once. " +
+                "\t Mapping: {}", subscriptions);
+        }
+
         try {
             final boolean versionProbing =
                 checkMetadataVersions(minReceivedMetadataVersion, minSupportedMetadataVersion, futureMetadataVersion);