You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/09/25 18:43:35 UTC

[kafka] branch trunk updated: KAFKA-13296: warn if previous assignment has duplicate partitions (#11347)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4eb386f  KAFKA-13296: warn if previous assignment has duplicate partitions (#11347)
4eb386f is described below

commit 4eb386f6e060e12e1940c0d780987e3a7c438d74
Author: Walker Carlson <18...@users.noreply.github.com>
AuthorDate: Sat Sep 25 13:41:52 2021 -0500

    KAFKA-13296: warn if previous assignment has duplicate partitions (#11347)
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Luke Chen <sh...@gmail.com>
---
 .../streams/processor/internals/StreamsPartitionAssignor.java  | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 22b9c14..2ae381d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -317,6 +317,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
         int minSupportedMetadataVersion = LATEST_SUPPORTED_VERSION;
 
         boolean shutdownRequested = false;
+        boolean assignementErrorFound = false;
         int futureMetadataVersion = UNKNOWN;
         for (final Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
             final String consumerId = entry.getKey();
@@ -351,10 +352,19 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
 
             // add the consumer and any info in its subscription to the client
             clientMetadata.addConsumer(consumerId, subscription.ownedPartitions());
+            final int prevSize = allOwnedPartitions.size();
             allOwnedPartitions.addAll(subscription.ownedPartitions());
+            if (allOwnedPartitions.size() < prevSize + subscription.ownedPartitions().size()) {
+                assignementErrorFound = true;
+            }
             clientMetadata.addPreviousTasksAndOffsetSums(consumerId, info.taskOffsetSums());
         }
 
+        if (assignementErrorFound) {
+            log.warn("The previous assignment contains a partition more than once. " +
+                "\t Mapping: {}", subscriptions);
+        }
+
         try {
             final boolean versionProbing =
                 checkMetadataVersions(minReceivedMetadataVersion, minSupportedMetadataVersion, futureMetadataVersion);