You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/09/30 01:21:50 UTC

[kafka] branch trunk updated: MINOR: expand logging and improve error message during partition count resolution (#11364)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6d7a785  MINOR: expand logging and improve error message during partition count resolution (#11364)
6d7a785 is described below

commit 6d7a7859568eb5b25931a3be81cda9f4e0e3ec0d
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Wed Sep 29 18:19:43 2021 -0700

    MINOR: expand logging and improve error message during partition count resolution (#11364)
    
    Recently a user hit this TaskAssignmentException due to a bug in their regex that meant no topics matched the pattern subscription, which in turn meant that it was impossible to resolve the number of partitions of the downstream repartition since there was no upstream topic to get the partition count for. Debugging this was pretty difficult and ultimately came down to stepping through the code line by line, since even with TRACE logging we only got a partial picture.
    
    We should expand the logging to make sure the TRACE logging hits both conditional branches, and improve the error message with a suggestion for what to look for should someone hit this in the future
    
    Reviewers: Walker Carlson <wc...@confluent.io>, Guozhang Wang <gu...@confluent.io>, Matthias J. Sax <ma...@confluent.io>
---
 .../kafka/streams/processor/internals/RepartitionTopics.java       | 7 ++++++-
 .../kafka/streams/processor/internals/RepartitionTopicsTest.java   | 2 +-
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
index 24c6e81..801e6c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
@@ -155,6 +155,7 @@ public class RepartitionTopics {
                             log.trace("Unable to determine number of partitions for {}, another iteration is needed",
                                 repartitionSourceTopic);
                         } else {
+                            log.trace("Determined number of partitions for {} to be {}", repartitionSourceTopic, numPartitions);
                             repartitionTopicMetadata.get(repartitionSourceTopic).setNumberOfPartitions(numPartitions);
                             progressMadeThisIteration = true;
                         }
@@ -162,7 +163,11 @@ public class RepartitionTopics {
                 }
             }
             if (!progressMadeThisIteration && partitionCountNeeded) {
-                throw new TaskAssignmentException("Failed to compute number of partitions for all repartition topics");
+                log.error("Unable to determine the number of partitions of all repartition topics, most likely a source topic is missing or pattern doesn't match any topics\n" +
+                    "topic groups: {}\n" +
+                    "cluster topics: {}.", topicGroups, clusterMetadata.topics());
+                throw new TaskAssignmentException("Failed to compute number of partitions for all repartition topics, " +
+                    "make sure all user input topics are created and all Pattern subscriptions match at least one topic in the cluster");
             }
         } while (partitionCountNeeded);
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
index 88b70e0..ce94294 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
@@ -193,7 +193,7 @@ public class RepartitionTopicsTest {
         );
 
         final TaskAssignmentException exception = assertThrows(TaskAssignmentException.class, repartitionTopics::setup);
-        assertThat(exception.getMessage(), is("Failed to compute number of partitions for all repartition topics"));
+        assertThat(exception.getMessage(), is("Failed to compute number of partitions for all repartition topics, make sure all user input topics are created and all Pattern subscriptions match at least one topic in the cluster"));
     }
 
     @Test