You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/06/06 21:22:45 UTC

kafka git commit: KAFKA-4942: Fix commitTimeoutMs being set before the commit actually started

Repository: kafka
Updated Branches:
  refs/heads/trunk 2244c68eb -> d655d806e


KAFKA-4942: Fix commitTimeoutMs being set before the commit actually started

This fixes KAFKA-4942

This supersededs #2730

/cc simplesteph gwenshap ewencp

Author: Nick Pillitteri <ni...@smartertravelmedia.com>
Author: simplesteph <st...@gmail.com>

Reviewers: simplesteph <st...@gmail.com>, Konstantine Karantasis <ko...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #2912 from 56quarters/fix-connect-offset-commit


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d655d806
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d655d806
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d655d806

Branch: refs/heads/trunk
Commit: d655d806ee4fe345ea25f8ff4a5f168c1a5f609b
Parents: 2244c68
Author: Nick Pillitteri <ni...@smartertravelmedia.com>
Authored: Tue Jun 6 14:22:39 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Jun 6 14:22:39 2017 -0700

----------------------------------------------------------------------
 .../kafka/connect/runtime/WorkerSinkTask.java   |   8 +-
 .../connect/runtime/WorkerSinkTaskTest.java     | 100 +++++++++++++++++++
 2 files changed, 107 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d655d806/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 43ad6a1..c4567a3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -155,7 +155,6 @@ class WorkerSinkTask extends WorkerTask {
 
     protected void iteration() {
         final long offsetCommitIntervalMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
-        final long commitTimeoutMs = commitStarted + workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
 
         try {
             long now = time.milliseconds();
@@ -167,6 +166,8 @@ class WorkerSinkTask extends WorkerTask {
                 context.clearCommitRequest();
             }
 
+            final long commitTimeoutMs = commitStarted + workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
+
             // Check for timed out commits
             if (committing && now >= commitTimeoutMs) {
                 log.warn("Commit of {} offsets timed out", this);
@@ -250,6 +251,11 @@ class WorkerSinkTask extends WorkerTask {
         deliverMessages();
     }
 
+    // Visible for testing
+    boolean isCommitting() {
+        return committing;
+    }
+
     private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> offsets, int seqno) {
         try {
             consumer.commitSync(offsets);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d655d806/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index eb5f25c..eae3726 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -60,6 +60,10 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import static java.util.Arrays.asList;
 import static java.util.Collections.singleton;
@@ -534,6 +538,102 @@ public class WorkerSinkTaskTest {
         PowerMock.verifyAll();
     }
 
+    // Test that the commitTimeoutMs timestamp is correctly computed and checked in WorkerSinkTask.iteration()
+    // when there is a long running commit in process. See KAFKA-4942 for more information.
+    @Test
+    public void testLongRunningCommitWithoutTimeout() throws Exception {
+        expectInitializeTask();
+
+        // iter 1
+        expectPollInitialAssignment();
+
+        // iter 2
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+        EasyMock.expectLastCall();
+
+        final Map<TopicPartition, OffsetAndMetadata> workerStartingOffsets = new HashMap<>();
+        workerStartingOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET));
+        workerStartingOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+
+        final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>();
+        workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
+        workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+
+        // iter 3 - note that we return the current offset to indicate they should be committed
+        sinkTask.preCommit(workerCurrentOffsets);
+        EasyMock.expectLastCall().andReturn(workerCurrentOffsets);
+
+        // We need to delay the result of trying to commit offsets to Kafka via the consumer.commitAsync
+        // method. We do this so that we can test that we do not erroneously mark a commit as timed out
+        // while it is still running and under time. To fake this for tests we have the commit run in a
+        // separate thread and wait for a latch which we control back in the main thread.
+        final ExecutorService executor = Executors.newSingleThreadExecutor();
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        consumer.commitAsync(EasyMock.eq(workerCurrentOffsets), EasyMock.<OffsetCommitCallback>anyObject());
+        EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+            @SuppressWarnings("unchecked")
+            @Override
+            public Void answer() throws Throwable {
+                // Grab the arguments passed to the consumer.commitAsync method
+                final Object[] args = EasyMock.getCurrentArguments();
+                final Map<TopicPartition, OffsetAndMetadata> offsets = (Map<TopicPartition, OffsetAndMetadata>) args[0];
+                final OffsetCommitCallback callback = (OffsetCommitCallback) args[1];
+
+                executor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            latch.await();
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                        }
+
+                        callback.onComplete(offsets, null);
+                    }
+                });
+
+                return null;
+            }
+        });
+
+        // no actual consumer.commit() triggered
+        expectConsumerPoll(0);
+
+        sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        workerTask.iteration(); // iter 1 -- initial assignment
+
+        assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets"));
+        assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
+
+        time.sleep(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT);
+        workerTask.iteration(); // iter 2 -- deliver 2 records
+
+        sinkTaskContext.getValue().requestCommit();
+        workerTask.iteration(); // iter 3 -- commit in progress
+
+        // Make sure the "committing" flag didn't immediately get flipped back to false due to an incorrect timeout
+        assertTrue("Expected worker to be in the process of committing offsets", workerTask.isCommitting());
+
+        // Let the async commit finish and wait for it to end
+        latch.countDown();
+        executor.shutdown();
+        executor.awaitTermination(30, TimeUnit.SECONDS);
+
+        assertEquals(workerCurrentOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets"));
+        assertEquals(workerCurrentOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
+
+        PowerMock.verifyAll();
+    }
+
     @Test
     public void testMissingTimestampPropagation() throws Exception {
         expectInitializeTask();