You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/07 03:42:31 UTC

[GitHub] [kafka] showuon opened a new pull request #10645: KAFKA-12464: follow up PR to refactor codes and add logs

showuon opened a new pull request #10645:
URL: https://github.com/apache/kafka/pull/10645


   This is the follow up PR to address the remaining comments in https://github.com/apache/kafka/pull/10509.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #10645: KAFKA-12464: follow up PR to refactor codes and add logs

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #10645:
URL: https://github.com/apache/kafka/pull/10645#issuecomment-835608948


   Just some unrelated test failures: `kafka.server.DynamicBrokerReconfigurationTest.testKeyStoreAlter()` and the usual suspects in RaftClusterTest


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10645: KAFKA-12464: follow up PR to refactor codes and add logs

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10645:
URL: https://github.com/apache/kafka/pull/10645#discussion_r627903743



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -178,17 +178,17 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
 
         int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers);
         int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers);
-        // the expected number of members with maxQuota assignment
-        int expectedNumMembersHavingMorePartitions = totalPartitionsCount % numberOfConsumers;
-        // the number of members with exactly maxQuota partitions assigned
-        int numMembersHavingMorePartitions = 0;
+        // the expected number of members with over minQuota assignment
+        int expectedNumMembersAssignedOverMinQuota = totalPartitionsCount % numberOfConsumers;
+        // the number of members with over minQuota partitions assigned
+        int numMembersAssignedOverMinQuota = 0;

Review comment:
       > Just a nit -- and to clarify up front, if you agree with this let's still hold off on doing it here so this PR can finally be merged, as I figure any nits can be addressed in your general assign PR:
   
   > It's still a bit unclear what this value will be sued for when you first see it, maybe we can work in the word minQuota somewhere in the name? Eg expectedNumMembersWithMoreThanMinQuotaPartitions, or for a slightly shorter example numConsumersAssignedOverMinQuota, or something between or similar to those
   
   > FYI I'm also ok with it as-is if you prefer the current name -- just wanted to throw out some other suggestions. I'll trust you to pick whatever name feels right
   
   Make sense! I choose `expectedNumMembersAssignedOverMinQuota` and `numMembersAssignedOverMinQuota`. :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10645: KAFKA-12464: follow up PR to refactor codes and add logs

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10645:
URL: https://github.com/apache/kafka/pull/10645#discussion_r627924911



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -255,27 +260,35 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
                 partitionsTransferringOwnership.put(unassignedPartition, consumer);
 
             int currentAssignedCount = consumerAssignment.size();
-            int expectedAssignedCount = numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions ? maxQuota : minQuota;
+            int expectedAssignedCount = numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota ? maxQuota : minQuota;
             if (currentAssignedCount == expectedAssignedCount) {
                 if (currentAssignedCount == maxQuota) {
-                    numMembersHavingMorePartitions++;
+                    numMembersAssignedOverMinQuota++;
                 }
                 unfilledConsumerIter.remove();
             }
         }
 
         if (!unfilledMembers.isEmpty()) {
-            // we expected all the remaining unfilled members have minQuota partitions and we're already at the allowed number
-            // of max capacity members. Otherwise, there must be error here.
-            if (numMembersHavingMorePartitions != expectedNumMembersHavingMorePartitions) {
-                throw new IllegalStateException(String.format("We haven't reached the allowed number of max capacity members, " +
-                    "but no more partitions to be assigned to unfilled consumers: %s", unfilledMembers));
+            // we expected all the remaining unfilled members have minQuota partitions and we're already at the expected number
+            // of members with more than the minQuota partitions. Otherwise, there must be error here.
+            if (numMembersAssignedOverMinQuota != expectedNumMembersAssignedOverMinQuota) {
+                log.error("Current number of members with more than the minQuota partitions: {}, is less than the expected number " +
+                    "of members with more than the minQuota partitions: {}, and no more partitions to be assigned to the remaining unfilled consumers: {}",
+                    numMembersAssignedOverMinQuota, expectedNumMembersAssignedOverMinQuota, unfilledMembers);
+                throw new IllegalStateException("We haven't reached the expected number of members with " +

Review comment:
       > nit: same here, can you log an error with the remaining unfilledMembers? I know you already do that in the exception message, but imo it would be better to print in a log message instead of an exception, as it may be long
   
   I included most info in the error log, and just put simple error message in exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #10645: KAFKA-12464: follow up PR to refactor codes and add logs

Posted by GitBox <gi...@apache.org>.
ableegoldman merged pull request #10645:
URL: https://github.com/apache/kafka/pull/10645


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10645: KAFKA-12464: follow up PR to refactor codes and add logs

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10645:
URL: https://github.com/apache/kafka/pull/10645#discussion_r627924496



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -242,6 +244,9 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
                 if (unfilledMembers.isEmpty()) {
                     // Should not enter here since we have calculated the exact number to assign to each consumer
                     // There might be issues in the assigning algorithm, or maybe assigning the same partition to two owners.
+                    int currentPartitionIndex = unassignedPartitions.indexOf(unassignedPartition);
+                    log.error("No more unfilled consumers to be assigned. The remaining unassigned partitions are: {}",
+                        unassignedPartitions.subList(currentPartitionIndex, unassignedPartitions.size()));

Review comment:
       > nit (for next PR): can you log an error before throwing the exception and include the set of unassigned partitions? Either just print out the unassignedPartitions along with the current partition being processed so you can figure out which partitions are remaining after that, or else by actually computing the remaining partitions that have yet to be assigned. Since it's an error case, I think it's ok to spend a little extra time computing that for better debuggability
   
   Make sense. Added!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10645: KAFKA-12464: follow up PR to refactor codes and add logs

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10645:
URL: https://github.com/apache/kafka/pull/10645#discussion_r627926370



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -255,27 +260,35 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
                 partitionsTransferringOwnership.put(unassignedPartition, consumer);
 
             int currentAssignedCount = consumerAssignment.size();
-            int expectedAssignedCount = numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions ? maxQuota : minQuota;
+            int expectedAssignedCount = numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota ? maxQuota : minQuota;
             if (currentAssignedCount == expectedAssignedCount) {
                 if (currentAssignedCount == maxQuota) {
-                    numMembersHavingMorePartitions++;
+                    numMembersAssignedOverMinQuota++;
                 }
                 unfilledConsumerIter.remove();
             }
         }
 
         if (!unfilledMembers.isEmpty()) {
-            // we expected all the remaining unfilled members have minQuota partitions and we're already at the allowed number
-            // of max capacity members. Otherwise, there must be error here.
-            if (numMembersHavingMorePartitions != expectedNumMembersHavingMorePartitions) {
-                throw new IllegalStateException(String.format("We haven't reached the allowed number of max capacity members, " +
-                    "but no more partitions to be assigned to unfilled consumers: %s", unfilledMembers));
+            // we expected all the remaining unfilled members have minQuota partitions and we're already at the expected number
+            // of members with more than the minQuota partitions. Otherwise, there must be error here.
+            if (numMembersAssignedOverMinQuota != expectedNumMembersAssignedOverMinQuota) {
+                log.error("Current number of members with more than the minQuota partitions: {}, is less than the expected number " +
+                    "of members with more than the minQuota partitions: {}, and no more partitions to be assigned to the remaining unfilled consumers: {}",
+                    numMembersAssignedOverMinQuota, expectedNumMembersAssignedOverMinQuota, unfilledMembers);
+                throw new IllegalStateException("We haven't reached the expected number of members with " +
+                    "more than the minQuota partitions, but no more partitions to be assigned");
             } else {
                 for (String unfilledMember : unfilledMembers) {
                     int assignedPartitionsCount = assignment.get(unfilledMember).size();
                     if (assignedPartitionsCount != minQuota) {
-                        throw new IllegalStateException(String.format("Consumer: [%s] should have %d partitions, but got %d partitions, " +
-                            "and no more partitions to be assigned", unfilledMember, minQuota, assignedPartitionsCount));
+                        log.error("Consumer: [{}] should have {} partitions, but got {} partitions, and no more partitions " +
+                            "to be assigned. The remaining unfilled consumers are: {}", unfilledMember, minQuota, assignedPartitionsCount, unfilledMembers);
+                        throw new IllegalStateException(String.format("Consumer: [%s] doesn't reach minQuota partitions, " +
+                            "and no more partitions to be assigned", unfilledMember));
+                    } else {
+                        log.trace("skip over this unfilled member: [{}] because we've reached the expected number of " +
+                            "members with more than the minQuota partitions, and this member already have minQuota partitions", unfilledMember);

Review comment:
       > nit: can we add an else case that just logs that we skipped over this member because we reached max capacity and it was still at min? Not sure if debug or trace is more appropriate, might be worth just running the tests with this log in place to see how often it gets printed
   
   I put in `trace` level. After running all tests, there are 6 out of 27 tests will print this log.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10645: KAFKA-12464: follow up PR to refactor codes and add logs

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10645:
URL: https://github.com/apache/kafka/pull/10645#discussion_r627925066



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -255,27 +260,35 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
                 partitionsTransferringOwnership.put(unassignedPartition, consumer);
 
             int currentAssignedCount = consumerAssignment.size();
-            int expectedAssignedCount = numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions ? maxQuota : minQuota;
+            int expectedAssignedCount = numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota ? maxQuota : minQuota;
             if (currentAssignedCount == expectedAssignedCount) {
                 if (currentAssignedCount == maxQuota) {
-                    numMembersHavingMorePartitions++;
+                    numMembersAssignedOverMinQuota++;
                 }
                 unfilledConsumerIter.remove();
             }
         }
 
         if (!unfilledMembers.isEmpty()) {
-            // we expected all the remaining unfilled members have minQuota partitions and we're already at the allowed number
-            // of max capacity members. Otherwise, there must be error here.
-            if (numMembersHavingMorePartitions != expectedNumMembersHavingMorePartitions) {
-                throw new IllegalStateException(String.format("We haven't reached the allowed number of max capacity members, " +
-                    "but no more partitions to be assigned to unfilled consumers: %s", unfilledMembers));
+            // we expected all the remaining unfilled members have minQuota partitions and we're already at the expected number
+            // of members with more than the minQuota partitions. Otherwise, there must be error here.
+            if (numMembersAssignedOverMinQuota != expectedNumMembersAssignedOverMinQuota) {
+                log.error("Current number of members with more than the minQuota partitions: {}, is less than the expected number " +
+                    "of members with more than the minQuota partitions: {}, and no more partitions to be assigned to the remaining unfilled consumers: {}",
+                    numMembersAssignedOverMinQuota, expectedNumMembersAssignedOverMinQuota, unfilledMembers);
+                throw new IllegalStateException("We haven't reached the expected number of members with " +
+                    "more than the minQuota partitions, but no more partitions to be assigned");
             } else {
                 for (String unfilledMember : unfilledMembers) {
                     int assignedPartitionsCount = assignment.get(unfilledMember).size();
                     if (assignedPartitionsCount != minQuota) {
-                        throw new IllegalStateException(String.format("Consumer: [%s] should have %d partitions, but got %d partitions, " +
-                            "and no more partitions to be assigned", unfilledMember, minQuota, assignedPartitionsCount));
+                        log.error("Consumer: [{}] should have {} partitions, but got {} partitions, and no more partitions " +
+                            "to be assigned. The remaining unfilled consumers are: {}", unfilledMember, minQuota, assignedPartitionsCount, unfilledMembers);
+                        throw new IllegalStateException(String.format("Consumer: [%s] doesn't reach minQuota partitions, " +

Review comment:
       > nit: the exception here looks good, but once again let's also log an error (it just makes it easier to debug when you have something concrete in the place you encountered the error, whereas exceptions are not always printed right away). Should probably just log any info that could be useful, such as all remaining unfilledMembers
   
   I included most info in the error log, and just put simple error message in exception.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #10645: KAFKA-12464: follow up PR to refactor codes and add logs

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #10645:
URL: https://github.com/apache/kafka/pull/10645#issuecomment-834067928


   @ableegoldman , I've addressed all your comments in  #10509. Please take a look. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #10645: KAFKA-12464: follow up PR to refactor codes and add logs

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #10645:
URL: https://github.com/apache/kafka/pull/10645#discussion_r627924147



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##########
@@ -218,8 +218,10 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
                 consumerAssignment.addAll(minQuotaPartitions);
                 assignedPartitions.addAll(minQuotaPartitions);
                 allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size()));
-                // this consumer is potential maxQuota candidate since we're still under the number of expected max capacity members
-                if (numMembersHavingMorePartitions < expectedNumMembersHavingMorePartitions) {
+                // this consumer is potential maxQuota candidate since we're still under the number of expected members
+                // with more than the minQuota partitions. Note, if the number of expected members with more than
+                // the minQuota partitions is 0, it means minQuota == maxQuota, so they won't be put into unfilledMembers
+                if (numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) {

Review comment:
       > nit (again, please address this in the other PR so I can merge this one): as Guozhang pointed out in another comment, in the case minQuota == maxQuota, this comment is a bit misleading as the number of expected max capacity members is technically all of them, but the variable expectedNumMembersHavingMorePartitions refers to the number of members who have more than the minQuota number of partitions, which in that case would actually be zero.
   
   Agree! I refer to your suggested change except `this consumer may be assigned one more partition` (explain below), and add this line:
   ```
   Note, if the number of expected members with more than the minQuota partitions is 0, it means minQuota == maxQuota, so they won't be put into unfilledMembers
   ```
   This should make it more clear.
   
   > Just a thought: technically it's not even a "potential maxQuota" member, since as you pointed out in another comment "the unassignedPartitions size will always >= unfilledMembers size" -- therefore anything in unfilledMembers will in fact need to receive at least one partition. Does that sound right to you? (this is just a followup question to make sure we're on the same page, no need to do anything for this one)
   
   Not excatly. After what we've changed to add potential maxQuota members into unfilledMembers, the unassignedPartitions size will **not** always >= unfilledMembers size. There will be cases that the unfilledMembers won't get any additional partition. Here's the example (also in new added tests)
   2 topics: t1, t2
   total partitions: t1p0, t1p1, t2p0, t2p1, t2p2  ==> 5 partitions
   current assignment for c1, c2: (the partition t2p0 was assigning to c3, but c3 dropped)
   c1: t1p0, t2p1
   c2: t1p1, t2p2
   
   In this situation, the maxQuota is 3, minQuota is 2, expectedNumMembersAssignedOverMinQuota is 1. so, after 1st reassign previously owned partitions phase, the `numMembersAssignedOverMinQuota` is still 0, `unfilledMembers` will be [c1, c2], and unassignedPartitions will be [t2p0]. After 2nd phase, only c1 will get 1 partition assigned.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org