You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/16 13:57:02 UTC

[GitHub] [kafka] qingwei91 opened a new pull request, #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock

qingwei91 opened a new pull request, #12166:
URL: https://github.com/apache/kafka/pull/12166

   We should sync nextTimeToEmit with wall clock on each method call to ensure throttling works correctly in case of clock drift.
   If we dont, then in the event of significant clock drift, throttling might not happen for a long time, this can hurt performance.
   
   I've added a unit test to simulate clock drift and verify my change works.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on pull request #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock

Posted by GitBox <gi...@apache.org>.
mjsax commented on PR #12166:
URL: https://github.com/apache/kafka/pull/12166#issuecomment-1366902096

   Thanks for the PR! Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] qingwei91 commented on a diff in pull request #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #12166:
URL: https://github.com/apache/kafka/pull/12166#discussion_r928095637


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##########
@@ -333,6 +352,87 @@ public void shouldJoinWithCustomStoreSuppliers() {
         runJoin(streamJoined.withOtherStoreSupplier(otherStoreSupplier), joinWindows);
     }
 
+    @Test
+    public void shouldThrottleEmitNonJoinedOuterRecordsEvenWhenClockDrift() {

Review Comment:
   Thanks for the advice, I will try to mimick that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] qingwei91 commented on a diff in pull request #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #12166:
URL: https://github.com/apache/kafka/pull/12166#discussion_r873762061


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -226,9 +226,10 @@ private void emitNonJoinedOuterRecords(
             if (internalProcessorContext.currentSystemTimeMs() < sharedTimeTracker.nextTimeToEmit) {
                 return;
             }
-            if (sharedTimeTracker.nextTimeToEmit == 0) {
-                sharedTimeTracker.nextTimeToEmit = internalProcessorContext.currentSystemTimeMs();
-            }
+
+            // Ensure `nextTimeToEmit` is synced with `currentSystemTimeMs`, if we dont set it everytime,
+            // they can get out of sync during a clock drift
+            sharedTimeTracker.nextTimeToEmit = internalProcessorContext.currentSystemTimeMs();

Review Comment:
   Is it ok to have comments here? it wasn't obvious to me what this piece of code was doing initially, I thought having comments might help, but I don't feel strongly, please let me know if you'd like it removed



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##########
@@ -333,6 +352,87 @@ public void shouldJoinWithCustomStoreSuppliers() {
         runJoin(streamJoined.withOtherStoreSupplier(otherStoreSupplier), joinWindows);
     }
 
+    @Test
+    public void shouldThrottleEmitNonJoinedOuterRecordsEvenWhenClockDrift() {

Review Comment:
   This test is quite convoluted because it relies on low-level API, this appears to be the 1st instance in test (other test relies on higher level API), is this acceptable?
   
   I resort to this approach because we need to manipulate TimeTracker which isn't available in high level API. And I don't feel comfortable to make larger change in the codebase.
   
   Please let me know if you think there's a better way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lihaosky commented on a diff in pull request #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock

Posted by GitBox <gi...@apache.org>.
lihaosky commented on code in PR #12166:
URL: https://github.com/apache/kafka/pull/12166#discussion_r927166898


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##########
@@ -333,6 +352,87 @@ public void shouldJoinWithCustomStoreSuppliers() {
         runJoin(streamJoined.withOtherStoreSupplier(otherStoreSupplier), joinWindows);
     }
 
+    @Test
+    public void shouldThrottleEmitNonJoinedOuterRecordsEvenWhenClockDrift() {

Review Comment:
   Hi @qingwei91 , thanks for fixing and great test coverage! Regarding test complexity, can you do something similar as https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java#L768 to test time drift. Instead of mocking low level stores, can you check the final results?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] qingwei91 commented on a diff in pull request #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #12166:
URL: https://github.com/apache/kafka/pull/12166#discussion_r1015852039


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##########
@@ -333,6 +352,87 @@ public void shouldJoinWithCustomStoreSuppliers() {
         runJoin(streamJoined.withOtherStoreSupplier(otherStoreSupplier), joinWindows);
     }
 
+    @Test
+    public void shouldThrottleEmitNonJoinedOuterRecordsEvenWhenClockDrift() {

Review Comment:
   Hi @lihaosky , I changed the test to this: https://github.com/apache/kafka/pull/12166/commits/26f6fa6f809851c8e57c660472bcce47f881e536
   
   Is it ok?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax merged pull request #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock

Posted by GitBox <gi...@apache.org>.
mjsax merged PR #12166:
URL: https://github.com/apache/kafka/pull/12166


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on pull request #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock

Posted by GitBox <gi...@apache.org>.
mjsax commented on PR #12166:
URL: https://github.com/apache/kafka/pull/12166#issuecomment-1301499563

   @qingwei91 -- What is the status of this PR? Seems there is open comments that would need to be addressed? Would be great if we could push this over the finish line.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lihaosky commented on a diff in pull request #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock

Posted by GitBox <gi...@apache.org>.
lihaosky commented on code in PR #12166:
URL: https://github.com/apache/kafka/pull/12166#discussion_r927167240


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -226,9 +226,10 @@ private void emitNonJoinedOuterRecords(
             if (internalProcessorContext.currentSystemTimeMs() < sharedTimeTracker.nextTimeToEmit) {
                 return;
             }
-            if (sharedTimeTracker.nextTimeToEmit == 0) {
-                sharedTimeTracker.nextTimeToEmit = internalProcessorContext.currentSystemTimeMs();
-            }
+
+            // Ensure `nextTimeToEmit` is synced with `currentSystemTimeMs`, if we dont set it everytime,
+            // they can get out of sync during a clock drift
+            sharedTimeTracker.nextTimeToEmit = internalProcessorContext.currentSystemTimeMs();

Review Comment:
   I'm ok with the comment 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lihaosky commented on pull request #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock

Posted by GitBox <gi...@apache.org>.
lihaosky commented on PR #12166:
URL: https://github.com/apache/kafka/pull/12166#issuecomment-1306192045

   @mjsax can help approve and merge as a committer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] qingwei91 commented on a diff in pull request #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on code in PR #12166:
URL: https://github.com/apache/kafka/pull/12166#discussion_r910374760


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##########
@@ -333,6 +352,87 @@ public void shouldJoinWithCustomStoreSuppliers() {
         runJoin(streamJoined.withOtherStoreSupplier(otherStoreSupplier), joinWindows);
     }
 
+    @Test
+    public void shouldThrottleEmitNonJoinedOuterRecordsEvenWhenClockDrift() {

Review Comment:
   I am happy to defer to you or fellow contributors/maintainers.
   
   My personal view is that the behavior change is quite subtle, so having test to codify it is useful, but if we are happy to merge it without unit test I am happy too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12166:
URL: https://github.com/apache/kafka/pull/12166#discussion_r909082677


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##########
@@ -333,6 +352,87 @@ public void shouldJoinWithCustomStoreSuppliers() {
         runJoin(streamJoined.withOtherStoreSupplier(otherStoreSupplier), joinWindows);
     }
 
+    @Test
+    public void shouldThrottleEmitNonJoinedOuterRecordsEvenWhenClockDrift() {

Review Comment:
   There is `KStreamWindowAggregateTest#shouldEmitWithLargeInterval()` that tests a similar thing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock

Posted by GitBox <gi...@apache.org>.
mjsax commented on code in PR #12166:
URL: https://github.com/apache/kafka/pull/12166#discussion_r909081516


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##########
@@ -333,6 +352,87 @@ public void shouldJoinWithCustomStoreSuppliers() {
         runJoin(streamJoined.withOtherStoreSupplier(otherStoreSupplier), joinWindows);
     }
 
+    @Test
+    public void shouldThrottleEmitNonJoinedOuterRecordsEvenWhenClockDrift() {

Review Comment:
   It'a a minor improvement to the actual code, so it might be ok to not add a test for it? -- Otherwise, I don't have a proposal for better code... It's in the guts so it's messy (and thus maybe not worth?) to test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lihaosky commented on pull request #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock

Posted by GitBox <gi...@apache.org>.
lihaosky commented on PR #12166:
URL: https://github.com/apache/kafka/pull/12166#issuecomment-1176495887

   I can also take a look by end of this week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] qingwei91 commented on pull request #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock

Posted by GitBox <gi...@apache.org>.
qingwei91 commented on PR #12166:
URL: https://github.com/apache/kafka/pull/12166#issuecomment-1302692988

   @mjsax sorry, I will try to pick this back up this weekend
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org