You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/23 07:00:29 UTC

[kafka] branch 2.2 updated: KAFKA-8229; Reset WorkerSinkTask offset commit interval after task commit (#6579)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 39a088e  KAFKA-8229; Reset WorkerSinkTask offset commit interval after task commit (#6579)
39a088e is described below

commit 39a088e3769d8e1eb78afdeecdcf92d7b0d15ec4
Author: sdreynolds <36...@users.noreply.github.com>
AuthorDate: Wed May 22 23:50:48 2019 -0700

    KAFKA-8229; Reset WorkerSinkTask offset commit interval after task commit (#6579)
    
    Prior to this change, the next commit time advances
    _each_ time a commit happens -- including when a commit happens
    because it was requested by the `Task`. When a `Task` requests a
    commit several times, the clock advances far into the future
    which prevents expected periodic commits from happening.
    
    This commit changes the behavior, we reset `nextCommit` relative
    to the time of the commit.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../org/apache/kafka/connect/runtime/WorkerSinkTask.java     |  7 ++++++-
 .../org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java | 12 ++++++++++++
 2 files changed, 18 insertions(+), 1 deletion(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index a112bfa..f21c500 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -206,7 +206,7 @@ class WorkerSinkTask extends WorkerTask {
             // Maybe commit
             if (!committing && (context.isCommitRequested() || now >= nextCommit)) {
                 commitOffsets(now, false);
-                nextCommit += offsetCommitIntervalMs;
+                nextCommit = now + offsetCommitIntervalMs;
                 context.clearCommitRequest();
             }
 
@@ -612,6 +612,11 @@ class WorkerSinkTask extends WorkerTask {
         return sinkTaskMetricsGroup;
     }
 
+    // Visible for testing
+    long getNextCommit() {
+        return nextCommit;
+    }
+
     private class HandleRebalance implements ConsumerRebalanceListener {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 3e047ff..4d62970 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -576,6 +576,10 @@ public class WorkerSinkTaskTest {
         assertTaskMetricValue("offset-commit-failure-percentage", 0.0);
         assertTaskMetricValue("offset-commit-success-percentage", 0.0);
 
+        // Grab the commit time prior to requesting a commit.
+        // This time should advance slightly after committing.
+        // KAFKA-8229
+        final long previousCommitValue = workerTask.getNextCommit();
         sinkTaskContext.getValue().requestCommit();
         assertTrue(sinkTaskContext.getValue().isCommitRequested());
         assertNotEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
@@ -585,6 +589,14 @@ public class WorkerSinkTaskTest {
         assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been cleared
         assertEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
         assertEquals(0, workerTask.commitFailures());
+        // Assert the next commit time advances slightly, the amount it advances
+        // is the normal commit time less the two sleeps since it started each
+        // of those sleeps were 10 seconds.
+        // KAFKA-8229
+        assertEquals("Should have only advanced by 40 seconds",
+                     previousCommitValue  +
+                     (WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT - 10000L * 2),
+                     workerTask.getNextCommit());
 
         assertSinkMetricValue("partition-count", 2);
         assertSinkMetricValue("sink-record-read-total", 1.0);