You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/12/09 06:10:12 UTC

[GitHub] [kafka] mjsax opened a new pull request #11584: MINOR: improve logging

mjsax opened a new pull request #11584:
URL: https://github.com/apache/kafka/pull/11584


   Simply reading log by sorting partitions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11584: MINOR: improve logging

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11584:
URL: https://github.com/apache/kafka/pull/11584#discussion_r791429259



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1516,4 +1527,20 @@ RebalanceProtocol getProtocol() {
     boolean poll(Timer timer) {
         return poll(timer, true);
     }
+
+
+
+    final static class TopicPartitionComparator implements Comparator<TopicPartition>, Serializable {

Review comment:
       Need to implement `Serializable` to make spotbug happy 🤷 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11584: MINOR: improve logging

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11584:
URL: https://github.com/apache/kafka/pull/11584#discussion_r791955444



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1516,4 +1527,20 @@ RebalanceProtocol getProtocol() {
     boolean poll(Timer timer) {
         return poll(timer, true);
     }
+
+
+
+    final static class TopicPartitionComparator implements Comparator<TopicPartition>, Serializable {
+        @Override
+        public int compare(TopicPartition topicPartition1, TopicPartition topicPartition2) {
+            String topic1 = topicPartition1.topic();
+            String topic2 = topicPartition2.topic();
+
+            if (topic1.equals(topic2)) {
+                return topicPartition1.partition() - topicPartition2.partition();
+            } else {
+                return topicPartition1.topic().compareTo(topicPartition2.topic());

Review comment:
       Good catch!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11584: MINOR: improve logging

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11584:
URL: https://github.com/apache/kafka/pull/11584#discussion_r800368829



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1516,4 +1527,20 @@ RebalanceProtocol getProtocol() {
     boolean poll(Timer timer) {
         return poll(timer, true);
     }
+
+

Review comment:
       nit: extra lines

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1516,4 +1527,20 @@ RebalanceProtocol getProtocol() {
     boolean poll(Timer timer) {
         return poll(timer, true);
     }
+
+
+
+    final static class TopicPartitionComparator implements Comparator<TopicPartition>, Serializable {

Review comment:
       nit: seems a bit weird to have this in the `ConsumerCoordinator`, would make more sense in a utility class or something related to TopicPartitions -- obviously the actual TopicPartition class would be ideal but unfortunately that would require a KIP, not sure what the second best choice would be but maybe `Topic` or `Utils` or even `CollectionUtils` (under the clients/common package)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11584: MINOR: improve logging

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11584:
URL: https://github.com/apache/kafka/pull/11584#discussion_r791569474



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1516,4 +1527,20 @@ RebalanceProtocol getProtocol() {
     boolean poll(Timer timer) {
         return poll(timer, true);
     }
+
+
+
+    final static class TopicPartitionComparator implements Comparator<TopicPartition>, Serializable {
+        @Override
+        public int compare(TopicPartition topicPartition1, TopicPartition topicPartition2) {
+            String topic1 = topicPartition1.topic();
+            String topic2 = topicPartition2.topic();
+
+            if (topic1.equals(topic2)) {
+                return topicPartition1.partition() - topicPartition2.partition();
+            } else {
+                return topicPartition1.topic().compareTo(topicPartition2.topic());

Review comment:
       nit: `topic1.compareTo(topic2)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #11584: MINOR: improve logging

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #11584:
URL: https://github.com/apache/kafka/pull/11584#issuecomment-1020784877


   Finally updated this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #11584: MINOR: improve logging

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #11584:
URL: https://github.com/apache/kafka/pull/11584#issuecomment-1020784877


   Finally updated this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11584: MINOR: improve logging

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11584:
URL: https://github.com/apache/kafka/pull/11584#discussion_r800428095



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1516,4 +1527,20 @@ RebalanceProtocol getProtocol() {
     boolean poll(Timer timer) {
         return poll(timer, true);
     }
+
+
+
+    final static class TopicPartitionComparator implements Comparator<TopicPartition>, Serializable {

Review comment:
       Good point @ableegoldman ! Actually, there's also another partition comparator in [AbstractStickyAssignor.java](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java#L1021) . We could make them into util class or `TopicPartitions` class if needed. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11584: MINOR: improve logging

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11584:
URL: https://github.com/apache/kafka/pull/11584#discussion_r791429259



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1516,4 +1527,20 @@ RebalanceProtocol getProtocol() {
     boolean poll(Timer timer) {
         return poll(timer, true);
     }
+
+
+
+    final static class TopicPartitionComparator implements Comparator<TopicPartition>, Serializable {

Review comment:
       Need to implement `Serializable` to make spotbug happy 🤷 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #11584: MINOR: improve logging

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #11584:
URL: https://github.com/apache/kafka/pull/11584


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11584: MINOR: improve logging

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11584:
URL: https://github.com/apache/kafka/pull/11584#discussion_r791569474



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1516,4 +1527,20 @@ RebalanceProtocol getProtocol() {
     boolean poll(Timer timer) {
         return poll(timer, true);
     }
+
+
+
+    final static class TopicPartitionComparator implements Comparator<TopicPartition>, Serializable {
+        @Override
+        public int compare(TopicPartition topicPartition1, TopicPartition topicPartition2) {
+            String topic1 = topicPartition1.topic();
+            String topic2 = topicPartition2.topic();
+
+            if (topic1.equals(topic2)) {
+                return topicPartition1.partition() - topicPartition2.partition();
+            } else {
+                return topicPartition1.topic().compareTo(topicPartition2.topic());

Review comment:
       nit: `topic1.compareTo(topic2)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #11584: MINOR: improve logging

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #11584:
URL: https://github.com/apache/kafka/pull/11584#discussion_r765638758



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -288,7 +288,7 @@ private Exception invokeOnAssignment(final ConsumerPartitionAssignor assignor, f
     }
 
     private Exception invokePartitionsAssigned(final Set<TopicPartition> assignedPartitions) {
-        log.info("Adding newly assigned partitions: {}", Utils.join(assignedPartitions, ", "));
+        log.info("Adding newly assigned partitions: {}", Utils.join(assignedPartitions.stream().sorted().toArray(), ", "));

Review comment:
       Good point!

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -397,10 +397,10 @@ protected void onJoinComplete(int generation,
                     "\tCurrent owned partitions:                  {}\n" +
                     "\tAdded partitions (assigned - owned):       {}\n" +
                     "\tRevoked partitions (owned - assigned):     {}\n",
-                assignedPartitions,
-                ownedPartitions,
-                addedPartitions,
-                revokedPartitions
+                Utils.join(assignedPartitions.stream().sorted().toArray(), ", "),
+                Utils.join(ownedPartitions.stream().sorted().toArray(), ", "),
+                Utils.join(addedPartitions.stream().sorted().toArray(), ", "),
+                Utils.join(revokedPartitions.stream().sorted().toArray(), ", ")

Review comment:
       Actually, the `assignedPartitions`, `addedPartitions`, and `revokedPartitions`, and the following `invokePartitionsRevoked`, `invokePartitionsAssigned` are coming from the `assignment.partitions()`(or the `assignedPartitions`). We can sort it first, and the rest of the partitions will be sorted.
   
   Just the `ownedPartitions` might still need to sort itself.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #11584: MINOR: improve logging

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #11584:
URL: https://github.com/apache/kafka/pull/11584#issuecomment-1036514041


   Thanks -- it's also ok to first just file Jira's for flaky tests -- I often don't know the code base (core/connect)... Of course, trying to fix them right away is even better.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #11584: MINOR: improve logging

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #11584:
URL: https://github.com/apache/kafka/pull/11584#discussion_r803222877



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -1516,4 +1527,20 @@ RebalanceProtocol getProtocol() {
     boolean poll(Timer timer) {
         return poll(timer, true);
     }
+
+
+
+    final static class TopicPartitionComparator implements Comparator<TopicPartition>, Serializable {

Review comment:
       Ack. Just pushed an update with the refactoring, adding a `Utils` class.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11584: MINOR: improve logging

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11584:
URL: https://github.com/apache/kafka/pull/11584#discussion_r765559208



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -288,7 +288,7 @@ private Exception invokeOnAssignment(final ConsumerPartitionAssignor assignor, f
     }
 
     private Exception invokePartitionsAssigned(final Set<TopicPartition> assignedPartitions) {
-        log.info("Adding newly assigned partitions: {}", Utils.join(assignedPartitions, ", "));
+        log.info("Adding newly assigned partitions: {}", Utils.join(assignedPartitions.stream().sorted().toArray(), ", "));

Review comment:
       FYI `TopicPartition` doesn't implement `Comparable` so you can't use `.stream().sorted()`

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##########
@@ -215,7 +215,7 @@ public String protocolType() {
 
     @Override
     protected JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() {
-        log.debug("Joining group with current subscription: {}", subscriptions.subscription());
+        log.debug("Joining group with current subscription: {}", Utils.join(subscriptions.subscription().stream().sorted().toArray(), ", "));

Review comment:
       Could we just keep the subscription sorted to begin with? (ie store in a sorted data structure)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org