You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/11/21 03:14:19 UTC
[kafka] branch 2.6 updated: KAFKA-10755: Should consider commit
latency when computing next commit timestamp (#9634)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 3fe933b KAFKA-10755: Should consider commit latency when computing next commit timestamp (#9634)
3fe933b is described below
commit 3fe933bb61bba3a190c374ae3e00c16e39084415
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Fri Nov 20 18:55:40 2020 -0800
KAFKA-10755: Should consider commit latency when computing next commit timestamp (#9634)
Reviewer: Guozhang Wang <gu...@confluent.io>
---
.../streams/processor/internals/StreamThread.java | 1 +
.../processor/internals/StreamThreadTest.java | 43 +++++++++++++++++++---
2 files changed, 39 insertions(+), 5 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 833af28..2ca4184 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -864,6 +864,7 @@ public class StreamThread extends Thread {
if (committed == -1) {
log.trace("Unable to commit as we are in the middle of a rebalance, will try again when it completes.");
} else {
+ advanceNowAndComputeLatency();
lastCommitMs = now;
}
} else {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index a076e6f..7ab5b56 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -709,17 +709,40 @@ public class StreamThreadTest {
}
@Test
- public void shouldCommitAfterTheCommitInterval() {
- final long commitInterval = 1000L;
+ public void shouldCommitAfterCommitInterval() {
+ final long commitInterval = 100L;
+ final long commitLatency = 10L;
+
final Properties props = configProps(false);
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval));
final StreamsConfig config = new StreamsConfig(props);
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
- final TaskManager taskManager = mockTaskManagerCommit(consumer, 2, 1);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST);
+
+ final AtomicBoolean committed = new AtomicBoolean(false);
+ final TaskManager taskManager = new TaskManager(
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ ) {
+ @Override
+ int commit(final Collection<Task> tasksToCommit) {
+ committed.set(true);
+ // we advance time to make sure the commit delay is considered when computing the next commit timestamp
+ mockTime.sleep(commitLatency);
+ return 1;
+ }
+ };
+
final StreamThread thread = new StreamThread(
mockTime,
config,
@@ -739,11 +762,21 @@ public class StreamThreadTest {
thread.setNow(mockTime.milliseconds());
thread.maybeCommit();
- mockTime.sleep(commitInterval + 1);
+ assertTrue(committed.get());
+
+ mockTime.sleep(commitInterval);
+
+ committed.set(false);
thread.setNow(mockTime.milliseconds());
thread.maybeCommit();
+ assertFalse(committed.get());
- verify(taskManager);
+ mockTime.sleep(1);
+
+ committed.set(false);
+ thread.setNow(mockTime.milliseconds());
+ thread.maybeCommit();
+ assertTrue(committed.get());
}
@Test