You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/09/30 01:28:42 UTC

[kafka] branch 3.0 updated: MINOR: expand logging and improve error message during partition count resolution (#11364)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 4e1f147  MINOR: expand logging and improve error message during partition count resolution (#11364)
4e1f147 is described below

commit 4e1f1477d16785bed131e8f8420e0c6edf341797
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Wed Sep 29 18:19:43 2021 -0700

    MINOR: expand logging and improve error message during partition count resolution (#11364)
    
    Recently a user hit this TaskAssignmentException due to a bug in their regex that meant no topics matched the pattern subscription, which in turn meant that it was impossible to resolve the number of partitions of the downstream repartition since there was no upstream topic to get the partition count for. Debugging this was pretty difficult and ultimately came down to stepping through the code line by line, since even with TRACE logging we only got a partial picture.
    
    We should expand the logging to make sure the TRACE logging hits both conditional branches, and improve the error message with a suggestion for what to look for should someone hit this in the future
    
    Reviewers: Walker Carlson <wc...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../kafka/streams/processor/internals/RepartitionTopics.java       | 7 ++++++-
 .../kafka/streams/processor/internals/RepartitionTopicsTest.java   | 2 +-
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
index 8656198..469c352 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
@@ -155,6 +155,7 @@ public class RepartitionTopics {
                             log.trace("Unable to determine number of partitions for {}, another iteration is needed",
                                 repartitionSourceTopic);
                         } else {
+                            log.trace("Determined number of partitions for {} to be {}", repartitionSourceTopic, numPartitions);
                             repartitionTopicMetadata.get(repartitionSourceTopic).setNumberOfPartitions(numPartitions);
                             progressMadeThisIteration = true;
                         }
@@ -162,7 +163,11 @@ public class RepartitionTopics {
                 }
             }
             if (!progressMadeThisIteration && partitionCountNeeded) {
-                throw new TaskAssignmentException("Failed to compute number of partitions for all repartition topics");
+                log.error("Unable to determine the number of partitions of all repartition topics, most likely a source topic is missing or pattern doesn't match any topics\n" +
+                    "topic groups: {}\n" +
+                    "cluster topics: {}.", topicGroups, clusterMetadata.topics());
+                throw new TaskAssignmentException("Failed to compute number of partitions for all repartition topics, " +
+                    "make sure all user input topics are created and all Pattern subscriptions match at least one topic in the cluster");
             }
         } while (partitionCountNeeded);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
index 30a4641..d7cf755 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
@@ -186,7 +186,7 @@ public class RepartitionTopicsTest {
         );
 
         final TaskAssignmentException exception = assertThrows(TaskAssignmentException.class, repartitionTopics::setup);
-        assertThat(exception.getMessage(), is("Failed to compute number of partitions for all repartition topics"));
+        assertThat(exception.getMessage(), is("Failed to compute number of partitions for all repartition topics, make sure all user input topics are created and all Pattern subscriptions match at least one topic in the cluster"));
     }
 
     @Test