You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/15 22:27:40 UTC

[GitHub] [kafka] ableegoldman opened a new pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

ableegoldman opened a new pull request #9446:
URL: https://github.com/apache/kafka/pull/9446


   Minor followup to KAFKA-10559
   
   I noticed that we were converting any and all TaskAssignmentException to the INCOMPLETE_SOURCE_TOPIC_METADATA error code to shut down all the clients. Since these errors are typically fatal, it does seem appropriate to propagate the shutdown command. But we should do so with a new AssignorError instead of piggy-backing on the INCOMPLETE_SOURCE_TOPIC_METADATA, which would be pretty confusing for users who do in fact have all their source topics.
   
   Changes in this PR:
   - Add new AssignorError.ASSIGNMENT_ERROR for generic assignment errors
   - Missing source topics --> throw/catch MissingSourceTopicException --> INCOMPLETE_SOURCE_TOPIC_METADATA 
   - Internal assignment errors --> throw/catch TaskAssignmentException --> ASSIGNMENT_ERROR
   
   Should be cherry-picked to 2.7
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9446:
URL: https://github.com/apache/kafka/pull/9446#discussion_r508866077



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
##########
@@ -52,8 +53,16 @@ public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
         // NB: all task management is already handled by:
         // org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment
         if (assignmentErrorCode.get() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
-            log.error("Received error code {}", assignmentErrorCode.get());
+            log.error("Received error code {}", AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA);
             throw new MissingSourceTopicException("One or more source topics were missing during rebalance");
+        } else if (assignmentErrorCode.get() == AssignorError.VERSION_PROBING.code()) {
+            log.info("Received error code {}", AssignorError.VERSION_PROBING);

Review comment:
       Maybe we shouldn't say "error" when it's not an error. I'm just imagining the mailing list questions that will start pouring in...
   
   ```suggestion
               log.info("Received version probing code {}", AssignorError.VERSION_PROBING);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9446:
URL: https://github.com/apache/kafka/pull/9446#discussion_r506622465



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
##########
@@ -52,8 +53,11 @@ public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
         // NB: all task management is already handled by:
         // org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment
         if (assignmentErrorCode.get() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
-            log.error("Received error code {}", assignmentErrorCode.get());
+            log.error("Received error code {}", AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA);
             throw new MissingSourceTopicException("One or more source topics were missing during rebalance");
+        } else if (assignmentErrorCode.get() == AssignorError.ASSIGNMENT_ERROR.code()) {
+            log.error("Received error code {}", AssignorError.ASSIGNMENT_ERROR);
+            throw new TaskAssignmentException("Hit an unexpected exception during task assignment phase of rebalance");
         }

Review comment:
       Ooh, but we did use to have a VERSION_PROBING error code that had value `2`. So we should skip that and go right to `3`, thanks for reminding me




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9446:
URL: https://github.com/apache/kafka/pull/9446#discussion_r508185204



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -1899,14 +1973,18 @@ public void shouldRequestEndOffsetsForPreexistingChangelogs() {
     @Test
     public void shouldRequestCommittedOffsetsForPreexistingSourceChangelogs() {
         final Set<TopicPartition> changelogs = mkSet(
-            new TopicPartition(APPLICATION_ID + "-store-changelog", 0),
-            new TopicPartition(APPLICATION_ID + "-store-changelog", 1),
-            new TopicPartition(APPLICATION_ID + "-store-changelog", 2)
+            new TopicPartition("topic1", 0),
+            new TopicPartition("topic1", 1),
+            new TopicPartition("topic1", 2)
         );
 
         final StreamsBuilder streamsBuilder = new StreamsBuilder();

Review comment:
       I happened to notice that this test was not actually using this `streamsBuilder`, so I tried to fix that and it broke. After some debugging I realized this test was broken in so many different ways that it's a miracle it was ever passing at all. Luckily all the bugs were in the test and not in the actual code 🙏 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9446:
URL: https://github.com/apache/kafka/pull/9446#discussion_r508657808



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -1899,14 +1973,18 @@ public void shouldRequestEndOffsetsForPreexistingChangelogs() {
     @Test
     public void shouldRequestCommittedOffsetsForPreexistingSourceChangelogs() {
         final Set<TopicPartition> changelogs = mkSet(
-            new TopicPartition(APPLICATION_ID + "-store-changelog", 0),
-            new TopicPartition(APPLICATION_ID + "-store-changelog", 1),
-            new TopicPartition(APPLICATION_ID + "-store-changelog", 2)
+            new TopicPartition("topic1", 0),
+            new TopicPartition("topic1", 1),
+            new TopicPartition("topic1", 2)
         );
 
         final StreamsBuilder streamsBuilder = new StreamsBuilder();

Review comment:
       Goodness... Thanks for fixing this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9446:
URL: https://github.com/apache/kafka/pull/9446#discussion_r506618949



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
##########
@@ -52,8 +53,11 @@ public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
         // NB: all task management is already handled by:
         // org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment
         if (assignmentErrorCode.get() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
-            log.error("Received error code {}", assignmentErrorCode.get());
+            log.error("Received error code {}", AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA);
             throw new MissingSourceTopicException("One or more source topics were missing during rebalance");
+        } else if (assignmentErrorCode.get() == AssignorError.ASSIGNMENT_ERROR.code()) {
+            log.error("Received error code {}", AssignorError.ASSIGNMENT_ERROR);
+            throw new TaskAssignmentException("Hit an unexpected exception during task assignment phase of rebalance");
         }

Review comment:
       +1 on future proofing this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #9446:
URL: https://github.com/apache/kafka/pull/9446#discussion_r507730030



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -409,9 +396,18 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
                 minSupportedMetadataVersion,
                 versionProbing,
                 probingRebalanceNeeded
-        );
+            );
 
-        return new GroupAssignment(assignment);
+            return new GroupAssignment(assignment);
+        } catch (final MissingSourceTopicException e) {
+            return new GroupAssignment(
+                errorAssignment(clientMetadataMap, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
+            );
+        } catch (final TaskAssignmentException e) {
+            return new GroupAssignment(
+                errorAssignment(clientMetadataMap, AssignorError.ASSIGNMENT_ERROR.code())
+            );

Review comment:
       I could not find a unit test that verifies this code.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -409,9 +396,18 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
                 minSupportedMetadataVersion,
                 versionProbing,
                 probingRebalanceNeeded
-        );
+            );
 
-        return new GroupAssignment(assignment);
+            return new GroupAssignment(assignment);
+        } catch (final MissingSourceTopicException e) {
+            return new GroupAssignment(
+                errorAssignment(clientMetadataMap, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
+            );

Review comment:
       I could not find a unit test that verifies this code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9446:
URL: https://github.com/apache/kafka/pull/9446#discussion_r508179485



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -409,9 +396,18 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
                 minSupportedMetadataVersion,
                 versionProbing,
                 probingRebalanceNeeded
-        );
+            );
 
-        return new GroupAssignment(assignment);
+            return new GroupAssignment(assignment);
+        } catch (final MissingSourceTopicException e) {
+            return new GroupAssignment(
+                errorAssignment(clientMetadataMap, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
+            );

Review comment:
       Ah. I had assumed there was one because this functionality is so old, but nope. Thanks for actually checking (added one)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9446:
URL: https://github.com/apache/kafka/pull/9446#discussion_r505901801



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -704,8 +700,9 @@ private boolean assignTasksToClients(final Cluster fullMetadata,
                                          final Map<UUID, ClientMetadata> clientMetadataMap,
                                          final Map<TaskId, Set<TopicPartition>> partitionsForTask,
                                          final Set<TaskId> statefulTasks) {
-        if (!statefulTasks.isEmpty())
-            throw new IllegalArgumentException("The stateful tasks should not be populated before assigning tasks to clients");
+        if (!statefulTasks.isEmpty()) {
+            throw new TaskAssignmentException("The stateful tasks should not be populated before assigning tasks to clients");

Review comment:
       If we ever hit this it means there is a serious bug in the assignment algorithm. In that case we should go ahead and shut everyone down once we notice, since otherwise it would be a long slow death of single threads at a time




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9446:
URL: https://github.com/apache/kafka/pull/9446#discussion_r506618643



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -409,9 +396,18 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
                 minSupportedMetadataVersion,
                 versionProbing,
                 probingRebalanceNeeded
-        );
+            );
 
-        return new GroupAssignment(assignment);
+            return new GroupAssignment(assignment);
+        } catch (final MissingSourceTopicException e) {
+            return new GroupAssignment(
+                errorAssignment(clientMetadataMap, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
+            );
+        } catch (final TaskAssignmentException e) {
+            return new GroupAssignment(
+                errorAssignment(clientMetadataMap, AssignorError.ASSIGNMENT_ERROR.code())

Review comment:
       In my experience, users would not check the logs of every single instance, especially if the first set of logs they checked states very clearly what the problem is -- even if it makes no sense




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9446:
URL: https://github.com/apache/kafka/pull/9446#discussion_r508030073



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -409,9 +396,18 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
                 minSupportedMetadataVersion,
                 versionProbing,
                 probingRebalanceNeeded
-        );
+            );
 
-        return new GroupAssignment(assignment);
+            return new GroupAssignment(assignment);
+        } catch (final MissingSourceTopicException e) {
+            return new GroupAssignment(
+                errorAssignment(clientMetadataMap, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
+            );
+        } catch (final TaskAssignmentException e) {
+            return new GroupAssignment(
+                errorAssignment(clientMetadataMap, AssignorError.ASSIGNMENT_ERROR.code())

Review comment:
       Perhaps I did misunderstand the code after all.
   
   My read was that the old clients would actually _not_ get an exception if we return a new code. Instead, they would continue running in an undefined state and maybe just cryptically do nothing, or maybe throw an even more confusing exception, or maybe do something worse.
   
   That was my motivation for making sure they would at least throw an exception (the same exception they've always thrown for this condition, right?)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9446:
URL: https://github.com/apache/kafka/pull/9446#issuecomment-713709721


   Cherry-picked to 2.7 (had to fix up a test in StreamsPartitionAssignorTest that was relying on the new `ReferenceContainer` which is only in trunk)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9446:
URL: https://github.com/apache/kafka/pull/9446#discussion_r505900644



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -330,75 +331,61 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
             clientMetadata.addPreviousTasksAndOffsetSums(consumerId, info.taskOffsetSums());
         }
 
-        final boolean versionProbing =
-            checkMetadataVersions(minReceivedMetadataVersion, minSupportedMetadataVersion, futureMetadataVersion);
+        try {

Review comment:
       The diff is kind of annoying but basically I just moved all the try-catch blocks into a single outer try that encapsulates all of the assignment logic. If we throw a TaskAssignmentException at any point it'll bail and just encode the `ASSIGNMENT_ERROR` code (or `INCOMPLETE_SOURCE_TOPIC_METADATA` if MissingSourceTopicException is thrown)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9446:
URL: https://github.com/apache/kafka/pull/9446#discussion_r505901990



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1235,7 +1232,7 @@ private boolean addClientAssignments(final Set<TaskId> statefulTasks,
                         consumersToFill.offer(consumer);
                     }
                 } else {
-                    throw new IllegalStateException("Ran out of unassigned stateful tasks but some members were not at capacity");
+                    throw new TaskAssignmentException("Ran out of unassigned stateful tasks but some members were not at capacity");

Review comment:
       Likewise, this means a bug in the assignment algorithm that should never be hit




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9446:
URL: https://github.com/apache/kafka/pull/9446#discussion_r508802283



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -409,9 +396,18 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
                 minSupportedMetadataVersion,
                 versionProbing,
                 probingRebalanceNeeded
-        );
+            );
 
-        return new GroupAssignment(assignment);
+            return new GroupAssignment(assignment);
+        } catch (final MissingSourceTopicException e) {
+            return new GroupAssignment(
+                errorAssignment(clientMetadataMap, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
+            );
+        } catch (final TaskAssignmentException e) {
+            return new GroupAssignment(
+                errorAssignment(clientMetadataMap, AssignorError.ASSIGNMENT_ERROR.code())

Review comment:
       Ok, what I didn't realize before is that the "prior" code in this diff hasn't been released yet, so we are good to just do this change in place.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9446:
URL: https://github.com/apache/kafka/pull/9446#issuecomment-713685350


   Merged to trunk


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #9446:
URL: https://github.com/apache/kafka/pull/9446#discussion_r506567159



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java
##########
@@ -52,8 +53,11 @@ public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
         // NB: all task management is already handled by:
         // org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment
         if (assignmentErrorCode.get() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
-            log.error("Received error code {}", assignmentErrorCode.get());
+            log.error("Received error code {}", AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA);
             throw new MissingSourceTopicException("One or more source topics were missing during rebalance");
+        } else if (assignmentErrorCode.get() == AssignorError.ASSIGNMENT_ERROR.code()) {
+            log.error("Received error code {}", AssignorError.ASSIGNMENT_ERROR);
+            throw new TaskAssignmentException("Hit an unexpected exception during task assignment phase of rebalance");
         }

Review comment:
       Hey @ableegoldman , I just looked into this, and it appears that this is the only place that the error code would be read and thrown as an exception to kill the thread, right?
   
   If so, then it looks like older clients are looking specifically for the error code to be `== AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()`, and they'd interpret any other code as "looks good" and proceed with PARTITIONS_ASSIGNED.
   
   Maybe there's nothing we can do about it now, but perhaps we should put in a block to future-proof this code by adding an `else if (assignmentErrorCode.get() != 0) { throw new TaskAssignmentException("Unknown error code: "+assignmentErrorCode.get()) }` ?
   
   Or is this already handled in some other way I'm not seeing?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -409,9 +396,18 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
                 minSupportedMetadataVersion,
                 versionProbing,
                 probingRebalanceNeeded
-        );
+            );
 
-        return new GroupAssignment(assignment);
+            return new GroupAssignment(assignment);
+        } catch (final MissingSourceTopicException e) {
+            return new GroupAssignment(
+                errorAssignment(clientMetadataMap, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
+            );
+        } catch (final TaskAssignmentException e) {
+            return new GroupAssignment(
+                errorAssignment(clientMetadataMap, AssignorError.ASSIGNMENT_ERROR.code())

Review comment:
       Following on my other comment, should we gate this on the usedMetadataVersion so that we'd return a `INCOMPLETE_SOURCE_TOPIC_METADATA` for versions 7 and below, to ensure they'll be properly handled as exceptions on the member side?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9446:
URL: https://github.com/apache/kafka/pull/9446#discussion_r506616155



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -409,9 +396,18 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
                 minSupportedMetadataVersion,
                 versionProbing,
                 probingRebalanceNeeded
-        );
+            );
 
-        return new GroupAssignment(assignment);
+            return new GroupAssignment(assignment);
+        } catch (final MissingSourceTopicException e) {
+            return new GroupAssignment(
+                errorAssignment(clientMetadataMap, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
+            );
+        } catch (final TaskAssignmentException e) {
+            return new GroupAssignment(
+                errorAssignment(clientMetadataMap, AssignorError.ASSIGNMENT_ERROR.code())

Review comment:
       I'm honestly not sure that INCOMPLETE_SOURCE_TOPIC_METADATA is better than just throwing nothing for older clients. Is it better to shut everyone down immediately if it not only completely obfuscates the reason for shutting down, but actually points to a completely different (and wrong) root cause?
   
   Obviously a slow death by one thread at a time is not a good user experience, but this is slightly different. For one thing, you'd still see all the upgraded clients shut down, and only the older ones would remain. Presumably you're in the middle of a rolling bounce, so eventually all of those surviving older clients will eventually be upgraded to understand this error code and shut down. Note that the previous behavior is to just kill a single thread at a time upon hitting this exception, so it's not a regression. Throwing a misleading exception actually seems more like a regression to me. 
   
   WDYT? How important does it seem to ensure that the entire group shuts down immediately during a rolling bounce? It does seem preferable to cut the rolling bounce early if it's doomed from the start. But I personally feel like logging a misleading error is a worse user experience. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #9446:
URL: https://github.com/apache/kafka/pull/9446


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9446:
URL: https://github.com/apache/kafka/pull/9446#discussion_r508182677



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -409,9 +396,18 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr
                 minSupportedMetadataVersion,
                 versionProbing,
                 probingRebalanceNeeded
-        );
+            );
 
-        return new GroupAssignment(assignment);
+            return new GroupAssignment(assignment);
+        } catch (final MissingSourceTopicException e) {
+            return new GroupAssignment(
+                errorAssignment(clientMetadataMap, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code())
+            );
+        } catch (final TaskAssignmentException e) {
+            return new GroupAssignment(
+                errorAssignment(clientMetadataMap, AssignorError.ASSIGNMENT_ERROR.code())
+            );

Review comment:
       This is covered by `shouldEncodeAssignmentErrorIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed` which hits the TaskAssignmentException in `checkMetadataVersions()`.
   
   I can try to add another test for one of the other instances where TaskAssignmentException might be thrown, but I would have to go really out of the way to artificially corrupt the data structures of the assignor in order to actually hit it (eg by mocking the `Cluster` so that it returns one set of topics the first time it's called, and then reports another set of topics the next time). I'm honestly not sure if it adds any value at that point -- WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #9446:
URL: https://github.com/apache/kafka/pull/9446#issuecomment-709625902


   heads up @mjsax @guozhangwang 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9446: MINOR: distinguish between missing source topics and internal assignment errors

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9446:
URL: https://github.com/apache/kafka/pull/9446#discussion_r505901265



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -434,7 +430,7 @@ private boolean checkMetadataVersions(final int minReceivedMetadataVersion,
                 minSupportedMetadataVersion);
 
         } else {
-            throw new IllegalStateException(
+            throw new TaskAssignmentException(

Review comment:
       Just tried to standardize all permanent errors to throw TaskAssignmentException instead. eg if there are incompatible mixed versions we should shut everyone down until the operator can sort it out




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org