You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/16 14:01:20 UTC

[GitHub] [kafka] qingwei91 commented on a diff in pull request #12166: KAFKA-13817 Always sync nextTimeToEmit with wall clock

qingwei91 commented on code in PR #12166:
URL: https://github.com/apache/kafka/pull/12166#discussion_r873762061


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -226,9 +226,10 @@ private void emitNonJoinedOuterRecords(
             if (internalProcessorContext.currentSystemTimeMs() < sharedTimeTracker.nextTimeToEmit) {
                 return;
             }
-            if (sharedTimeTracker.nextTimeToEmit == 0) {
-                sharedTimeTracker.nextTimeToEmit = internalProcessorContext.currentSystemTimeMs();
-            }
+
+            // Ensure `nextTimeToEmit` is synced with `currentSystemTimeMs`, if we dont set it everytime,
+            // they can get out of sync during a clock drift
+            sharedTimeTracker.nextTimeToEmit = internalProcessorContext.currentSystemTimeMs();

Review Comment:
   Is it ok to have comments here? it wasn't obvious to me what this piece of code was doing initially, I thought having comments might help, but I don't feel strongly, please let me know if you'd like it removed



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java:
##########
@@ -333,6 +352,87 @@ public void shouldJoinWithCustomStoreSuppliers() {
         runJoin(streamJoined.withOtherStoreSupplier(otherStoreSupplier), joinWindows);
     }
 
+    @Test
+    public void shouldThrottleEmitNonJoinedOuterRecordsEvenWhenClockDrift() {

Review Comment:
   This test is quite convoluted because it relies on low-level API, this appears to be the 1st instance in test (other test relies on higher level API), is this acceptable?
   
   I resort to this approach because we need to manipulate TimeTracker which isn't available in high level API. And I don't feel comfortable to make larger change in the codebase.
   
   Please let me know if you think there's a better way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org