You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/10 20:42:13 UTC

[GitHub] [kafka] mjsax opened a new pull request #10102: KAFKA-12272: Fix commit-interval metrics

mjsax opened a new pull request #10102:
URL: https://github.com/apache/kafka/pull/10102


   Call for review @ableegoldman
   
   Fixes a regression bug. Should be cherry-picked back to 2.6 branch.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10102: KAFKA-12272: Fix commit-interval metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10102:
URL: https://github.com/apache/kafka/pull/10102#discussion_r574187073



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -776,9 +776,10 @@ void runOnce() {
 
                 log.debug("{} punctuators ran.", punctuated);
 
+                final long beforeCommitTs = now;
                 final int committed = maybeCommit();
                 totalCommittedSinceLastSummary += committed;
-                final long commitLatency = advanceNowAndComputeLatency();
+                final long commitLatency = Math.max(now - beforeCommitTs, 0);

Review comment:
       > Yes, that is a little odd -- as I said, happy to change advancedNowAndComputeLatency to now = time.milliseconds() within maybeCommit().
   
   Sounds good to me, let's do it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #10102: KAFKA-12272: Fix commit-interval metrics

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #10102:
URL: https://github.com/apache/kafka/pull/10102#issuecomment-777145424


   Thanks @ableegoldman -- updated the PR. If there are no further concerns raised, I'll merge after Jenkins passed and cherry-pick to `2.8` branch.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10102: KAFKA-12272: Fix commit-interval metrics

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10102:
URL: https://github.com/apache/kafka/pull/10102#discussion_r574183054



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -776,9 +776,10 @@ void runOnce() {
 
                 log.debug("{} punctuators ran.", punctuated);
 
+                final long beforeCommitTs = now;
                 final int committed = maybeCommit();
                 totalCommittedSinceLastSummary += committed;
-                final long commitLatency = advanceNowAndComputeLatency();
+                final long commitLatency = Math.max(now - beforeCommitTs, 0);

Review comment:
       > so there shouldn't be a noticeable difference between advancing now at the end of maybeCommit vs advancing it immediately after maybeCommit returns, right?
   
   Well, not from a runtime point of view.
   
   > (and similarly move the lastCommitMs = now to after maybeCommit returns)
   
   Exactly: we need to first advance `now` and than update `lastCommitMs` -- but I would rather not update `lastCommitMs` outside of `maybeCommit`, because it breaks encapsulation?
   
   > I'm also ok with just advancing now inside maybeCommit. The main thing that felt off was just that we compute the latency for no reason inside maybeCommit, and ignore the result
   
   Yes, that is a little odd -- as I said, happy to change `advancedNowAndComputeLatency` to `now = time.milliseconds()` within `maybeCommit()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #10102: KAFKA-12272: Fix commit-interval metrics

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #10102:
URL: https://github.com/apache/kafka/pull/10102#issuecomment-777898035


   Merged to `trunk` and cherry-picked to `2.8` branch.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10102: KAFKA-12272: Fix commit-interval metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10102:
URL: https://github.com/apache/kafka/pull/10102#discussion_r574090779



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -776,9 +776,10 @@ void runOnce() {
 
                 log.debug("{} punctuators ran.", punctuated);
 
+                final long beforeCommitTs = now;
                 final int committed = maybeCommit();
                 totalCommittedSinceLastSummary += committed;
-                final long commitLatency = advanceNowAndComputeLatency();
+                final long commitLatency = Math.max(now - beforeCommitTs, 0);

Review comment:
       (and similarly move the `lastCommitMs = now` to after `maybeCommit` returns)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10102: KAFKA-12272: Fix commit-interval metrics

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10102:
URL: https://github.com/apache/kafka/pull/10102#discussion_r574085771



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -776,9 +776,10 @@ void runOnce() {
 
                 log.debug("{} punctuators ran.", punctuated);
 
+                final long beforeCommitTs = now;
                 final int committed = maybeCommit();
                 totalCommittedSinceLastSummary += committed;
-                final long commitLatency = advanceNowAndComputeLatency();
+                final long commitLatency = Math.max(now - beforeCommitTs, 0);

Review comment:
       Well, we need to advance time _within_ `maybeCommit()` to make sure we compute `lastCommitMs` correctly (what was exactly the fix that introduced the commit-latency metric regression).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #10102: KAFKA-12272: Fix commit-interval metrics

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #10102:
URL: https://github.com/apache/kafka/pull/10102


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10102: KAFKA-12272: Fix commit-interval metrics

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10102:
URL: https://github.com/apache/kafka/pull/10102#discussion_r574086409



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -776,9 +776,10 @@ void runOnce() {
 
                 log.debug("{} punctuators ran.", punctuated);
 
+                final long beforeCommitTs = now;
                 final int committed = maybeCommit();
                 totalCommittedSinceLastSummary += committed;
-                final long commitLatency = advanceNowAndComputeLatency();
+                final long commitLatency = Math.max(now - beforeCommitTs, 0);

Review comment:
       We could replace `advancedNowAndComputeLatency` by `now = time.milliseconds()` in `maybeCommit()` though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10102: KAFKA-12272: Fix commit-interval metrics

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10102:
URL: https://github.com/apache/kafka/pull/10102#discussion_r574079328



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -776,9 +776,10 @@ void runOnce() {
 
                 log.debug("{} punctuators ran.", punctuated);
 
+                final long beforeCommitTs = now;

Review comment:
       Happy to address.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10102: KAFKA-12272: Fix commit-interval metrics

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10102:
URL: https://github.com/apache/kafka/pull/10102#discussion_r574086409



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -776,9 +776,10 @@ void runOnce() {
 
                 log.debug("{} punctuators ran.", punctuated);
 
+                final long beforeCommitTs = now;
                 final int committed = maybeCommit();
                 totalCommittedSinceLastSummary += committed;
-                final long commitLatency = advanceNowAndComputeLatency();
+                final long commitLatency = Math.max(now - beforeCommitTs, 0);

Review comment:
       We could replace `advancedNowAndComputeLatency` by `now = time.milliseconds()` in `maybeCommit()` though. -- In the original PR, we just called `advancedNowAndComputeLatency` to keep the patter of a single place in the code that does call `time.milliseconds`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10102: KAFKA-12272: Fix commit-interval metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10102:
URL: https://github.com/apache/kafka/pull/10102#discussion_r574090292



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -776,9 +776,10 @@ void runOnce() {
 
                 log.debug("{} punctuators ran.", punctuated);
 
+                final long beforeCommitTs = now;
                 final int committed = maybeCommit();
                 totalCommittedSinceLastSummary += committed;
-                final long commitLatency = advanceNowAndComputeLatency();
+                final long commitLatency = Math.max(now - beforeCommitTs, 0);

Review comment:
       But currently we call `advanceNowAndComputeLatency` pretty much at the end of the `maybeCommit` method, so there shouldn't be a noticeable difference between advancing `now` at the end of `maybeCommit` vs advancing it immediately after `maybeCommit` returns, right?
   I'm also ok with just advancing `now` inside `maybeCommit`. The main thing that felt off was just that we compute the latency for no reason inside `maybeCommit`, and ignore the result




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10102: KAFKA-12272: Fix commit-interval metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10102:
URL: https://github.com/apache/kafka/pull/10102#discussion_r574075417



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -776,9 +776,10 @@ void runOnce() {
 
                 log.debug("{} punctuators ran.", punctuated);
 
+                final long beforeCommitTs = now;

Review comment:
       such a small nit that I don't think it's worth pushing a fix and restarting the build, but should this be `beforeCommitMs` rather than `Ts`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #10102: KAFKA-12272: Fix commit-interval metrics

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #10102:
URL: https://github.com/apache/kafka/pull/10102#discussion_r574078902



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -776,9 +776,10 @@ void runOnce() {
 
                 log.debug("{} punctuators ran.", punctuated);
 
+                final long beforeCommitTs = now;
                 final int committed = maybeCommit();
                 totalCommittedSinceLastSummary += committed;
-                final long commitLatency = advanceNowAndComputeLatency();
+                final long commitLatency = Math.max(now - beforeCommitTs, 0);

Review comment:
       Also kind of a nit, since technically this does work, but wouldn't it make more sense to just remove the `advanceNowAndComputeLatency` call in `maybeCommit`, and then just call `advancedNowAndComputeLatency` here as before? Otherwise we're just computing the latency inside `maybeCommit` for no reason, and throwing out the result. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org