You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/11/21 02:57:15 UTC

[kafka] branch trunk updated: KAFKA-10755: Should consider commit latency when computing next commit timestamp (#9634)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 351a22a  KAFKA-10755: Should consider commit latency when computing next commit timestamp (#9634)
351a22a is described below

commit 351a22a12ecb79a1e1070bf1129f2e168e4e0670
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Fri Nov 20 18:55:40 2020 -0800

    KAFKA-10755: Should consider commit latency when computing next commit timestamp (#9634)
    
    Reviewer: Guozhang Wang <gu...@confluent.io>
---
 .../streams/processor/internals/StreamThread.java  |  1 +
 .../processor/internals/StreamThreadTest.java      | 43 +++++++++++++++++++---
 2 files changed, 39 insertions(+), 5 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index edb7709..0407b98 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -944,6 +944,7 @@ public class StreamThread extends Thread {
             if (committed == -1) {
                 log.debug("Unable to commit as we are in the middle of a rebalance, will try again when it completes.");
             } else {
+                advanceNowAndComputeLatency();
                 lastCommitMs = now;
             }
         } else {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 54361f0..a0a0e3a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -741,18 +741,41 @@ public class StreamThreadTest {
     }
 
     @Test
-    public void shouldCommitAfterTheCommitInterval() {
-        final long commitInterval = 1000L;
+    public void shouldCommitAfterCommitInterval() {
+        final long commitInterval = 100L;
+        final long commitLatency = 10L;
+
         final Properties props = configProps(false);
         props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
         props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval));
 
         final StreamsConfig config = new StreamsConfig(props);
         final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
-        final TaskManager taskManager = mockTaskManagerCommit(consumer, 2, 1);
 
         final StreamsMetricsImpl streamsMetrics =
             new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
+
+        final AtomicBoolean committed = new AtomicBoolean(false);
+        final TaskManager taskManager = new TaskManager(
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null
+        ) {
+            @Override
+            int commit(final Collection<Task> tasksToCommit) {
+                committed.set(true);
+                // we advance time to make sure the commit delay is considered when computing the next commit timestamp
+                mockTime.sleep(commitLatency);
+                return 1;
+            }
+        };
+
         final StreamThread thread = new StreamThread(
             mockTime,
             config,
@@ -775,11 +798,21 @@ public class StreamThreadTest {
 
         thread.setNow(mockTime.milliseconds());
         thread.maybeCommit();
-        mockTime.sleep(commitInterval + 1);
+        assertTrue(committed.get());
+
+        mockTime.sleep(commitInterval);
+
+        committed.set(false);
         thread.setNow(mockTime.milliseconds());
         thread.maybeCommit();
+        assertFalse(committed.get());
 
-        verify(taskManager);
+        mockTime.sleep(1);
+
+        committed.set(false);
+        thread.setNow(mockTime.milliseconds());
+        thread.maybeCommit();
+        assertTrue(committed.get());
     }
 
     @Test