You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/09/25 18:43:35 UTC
[kafka] branch trunk updated: KAFKA-13296: warn if previous
assignment has duplicate partitions (#11347)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4eb386f KAFKA-13296: warn if previous assignment has duplicate partitions (#11347)
4eb386f is described below
commit 4eb386f6e060e12e1940c0d780987e3a7c438d74
Author: Walker Carlson <18...@users.noreply.github.com>
AuthorDate: Sat Sep 25 13:41:52 2021 -0500
KAFKA-13296: warn if previous assignment has duplicate partitions (#11347)
Reviewers: Matthias J. Sax <ma...@confluent.io>, Luke Chen <sh...@gmail.com>
---
.../streams/processor/internals/StreamsPartitionAssignor.java | 10 ++++++++++
1 file changed, 10 insertions(+)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 22b9c14..2ae381d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -317,6 +317,7 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
int minSupportedMetadataVersion = LATEST_SUPPORTED_VERSION;
boolean shutdownRequested = false;
+ boolean assignementErrorFound = false;
int futureMetadataVersion = UNKNOWN;
for (final Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
final String consumerId = entry.getKey();
@@ -351,10 +352,19 @@ public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Conf
// add the consumer and any info in its subscription to the client
clientMetadata.addConsumer(consumerId, subscription.ownedPartitions());
+ final int prevSize = allOwnedPartitions.size();
allOwnedPartitions.addAll(subscription.ownedPartitions());
+ if (allOwnedPartitions.size() < prevSize + subscription.ownedPartitions().size()) {
+ assignementErrorFound = true;
+ }
clientMetadata.addPreviousTasksAndOffsetSums(consumerId, info.taskOffsetSums());
}
+ if (assignementErrorFound) {
+ log.warn("The previous assignment contains a partition more than once. " +
+ "\t Mapping: {}", subscriptions);
+ }
+
try {
final boolean versionProbing =
checkMetadataVersions(minReceivedMetadataVersion, minSupportedMetadataVersion, futureMetadataVersion);