You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/23 06:51:13 UTC
[kafka] branch trunk updated: KAFKA-8229;
Reset WorkerSinkTask offset commit interval after task commit
(#6579)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 89f331e KAFKA-8229; Reset WorkerSinkTask offset commit interval after task commit (#6579)
89f331e is described below
commit 89f331eac3aaeab53a3b36bc437eba5f6213ca91
Author: sdreynolds <36...@users.noreply.github.com>
AuthorDate: Wed May 22 23:50:48 2019 -0700
KAFKA-8229; Reset WorkerSinkTask offset commit interval after task commit (#6579)
Prior to this change, the next commit time advances
_each_ time a commit happens -- including when a commit happens
because it was requested by the `Task`. When a `Task` requests a
commit several times, the clock advances far into the future
which prevents expected periodic commits from happening.
This commit changes the behavior, we reset `nextCommit` relative
to the time of the commit.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../org/apache/kafka/connect/runtime/WorkerSinkTask.java | 7 ++++++-
.../org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java | 12 ++++++++++++
2 files changed, 18 insertions(+), 1 deletion(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index a112bfa..f21c500 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -206,7 +206,7 @@ class WorkerSinkTask extends WorkerTask {
// Maybe commit
if (!committing && (context.isCommitRequested() || now >= nextCommit)) {
commitOffsets(now, false);
- nextCommit += offsetCommitIntervalMs;
+ nextCommit = now + offsetCommitIntervalMs;
context.clearCommitRequest();
}
@@ -612,6 +612,11 @@ class WorkerSinkTask extends WorkerTask {
return sinkTaskMetricsGroup;
}
+ // Visible for testing
+ long getNextCommit() {
+ return nextCommit;
+ }
+
private class HandleRebalance implements ConsumerRebalanceListener {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 3e047ff..4d62970 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -576,6 +576,10 @@ public class WorkerSinkTaskTest {
assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
assertTaskMetricValue("offset-commit-success-percentage", 0.0);
+ // Grab the commit time prior to requesting a commit.
+ // This time should advance slightly after committing.
+ // KAFKA-8229
+ final long previousCommitValue = workerTask.getNextCommit();
sinkTaskContext.getValue().requestCommit();
assertTrue(sinkTaskContext.getValue().isCommitRequested());
assertNotEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
@@ -585,6 +589,14 @@ public class WorkerSinkTaskTest {
assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been cleared
assertEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
assertEquals(0, workerTask.commitFailures());
+ // Assert the next commit time advances slightly, the amount it advances
+ // is the normal commit time less the two sleeps since it started each
+ // of those sleeps were 10 seconds.
+ // KAFKA-8229
+ assertEquals("Should have only advanced by 40 seconds",
+ previousCommitValue +
+ (WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT - 10000L * 2),
+ workerTask.getNextCommit());
assertSinkMetricValue("partition-count", 2);
assertSinkMetricValue("sink-record-read-total", 1.0);