You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2020/11/23 19:58:52 UTC
[kafka] branch 2.7 updated: KAFKA-10755: Should consider commit
latency when computing next commit timestamp (#9634)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new fd532ed KAFKA-10755: Should consider commit latency when computing next commit timestamp (#9634)
fd532ed is described below
commit fd532edf66ff13486df8420c7035d637ba0edb21
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Fri Nov 20 18:55:40 2020 -0800
KAFKA-10755: Should consider commit latency when computing next commit timestamp (#9634)
Reviewer: Guozhang Wang <gu...@confluent.io>
---
.../streams/processor/internals/StreamThread.java | 1 +
.../processor/internals/StreamThreadTest.java | 43 +++++++++++++++++++---
2 files changed, 39 insertions(+), 5 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 772194e..a5e4f86 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -899,6 +899,7 @@ public class StreamThread extends Thread {
if (committed == -1) {
log.debug("Unable to commit as we are in the middle of a rebalance, will try again when it completes.");
} else {
+ advanceNowAndComputeLatency();
lastCommitMs = now;
}
} else {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 4ee44ab..e1b4b11 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -716,18 +716,41 @@ public class StreamThreadTest {
}
@Test
- public void shouldCommitAfterTheCommitInterval() {
- final long commitInterval = 1000L;
+ public void shouldCommitAfterCommitInterval() {
+ final long commitInterval = 100L;
+ final long commitLatency = 10L;
+
final Properties props = configProps(false);
props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval));
final StreamsConfig config = new StreamsConfig(props);
final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class);
- final TaskManager taskManager = mockTaskManagerCommit(consumer, 2, 1);
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime);
+
+ final AtomicBoolean committed = new AtomicBoolean(false);
+ final TaskManager taskManager = new TaskManager(
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ ) {
+ @Override
+ int commit(final Collection<Task> tasksToCommit) {
+ committed.set(true);
+ // we advance time to make sure the commit delay is considered when computing the next commit timestamp
+ mockTime.sleep(commitLatency);
+ return 1;
+ }
+ };
+
final StreamThread thread = new StreamThread(
mockTime,
config,
@@ -747,11 +770,21 @@ public class StreamThreadTest {
thread.setNow(mockTime.milliseconds());
thread.maybeCommit();
- mockTime.sleep(commitInterval + 1);
+ assertTrue(committed.get());
+
+ mockTime.sleep(commitInterval);
+
+ committed.set(false);
thread.setNow(mockTime.milliseconds());
thread.maybeCommit();
+ assertFalse(committed.get());
- verify(taskManager);
+ mockTime.sleep(1);
+
+ committed.set(false);
+ thread.setNow(mockTime.milliseconds());
+ thread.maybeCommit();
+ assertTrue(committed.get());
}
@Test