You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/06/06 21:22:45 UTC
kafka git commit: KAFKA-4942: Fix commitTimeoutMs being set before
the commit actually started
Repository: kafka
Updated Branches:
refs/heads/trunk 2244c68eb -> d655d806e
KAFKA-4942: Fix commitTimeoutMs being set before the commit actually started
This fixes KAFKA-4942
This supersededs #2730
/cc simplesteph gwenshap ewencp
Author: Nick Pillitteri <ni...@smartertravelmedia.com>
Author: simplesteph <st...@gmail.com>
Reviewers: simplesteph <st...@gmail.com>, Konstantine Karantasis <ko...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #2912 from 56quarters/fix-connect-offset-commit
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d655d806
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d655d806
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d655d806
Branch: refs/heads/trunk
Commit: d655d806ee4fe345ea25f8ff4a5f168c1a5f609b
Parents: 2244c68
Author: Nick Pillitteri <ni...@smartertravelmedia.com>
Authored: Tue Jun 6 14:22:39 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Jun 6 14:22:39 2017 -0700
----------------------------------------------------------------------
.../kafka/connect/runtime/WorkerSinkTask.java | 8 +-
.../connect/runtime/WorkerSinkTaskTest.java | 100 +++++++++++++++++++
2 files changed, 107 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/d655d806/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 43ad6a1..c4567a3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -155,7 +155,6 @@ class WorkerSinkTask extends WorkerTask {
protected void iteration() {
final long offsetCommitIntervalMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
- final long commitTimeoutMs = commitStarted + workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
try {
long now = time.milliseconds();
@@ -167,6 +166,8 @@ class WorkerSinkTask extends WorkerTask {
context.clearCommitRequest();
}
+ final long commitTimeoutMs = commitStarted + workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
+
// Check for timed out commits
if (committing && now >= commitTimeoutMs) {
log.warn("Commit of {} offsets timed out", this);
@@ -250,6 +251,11 @@ class WorkerSinkTask extends WorkerTask {
deliverMessages();
}
+ // Visible for testing
+ boolean isCommitting() {
+ return committing;
+ }
+
private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> offsets, int seqno) {
try {
consumer.commitSync(offsets);
http://git-wip-us.apache.org/repos/asf/kafka/blob/d655d806/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index eb5f25c..eae3726 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -60,6 +60,10 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
@@ -534,6 +538,102 @@ public class WorkerSinkTaskTest {
PowerMock.verifyAll();
}
+ // Test that the commitTimeoutMs timestamp is correctly computed and checked in WorkerSinkTask.iteration()
+ // when there is a long running commit in process. See KAFKA-4942 for more information.
+ @Test
+ public void testLongRunningCommitWithoutTimeout() throws Exception {
+ expectInitializeTask();
+
+ // iter 1
+ expectPollInitialAssignment();
+
+ // iter 2
+ expectConsumerPoll(1);
+ expectConversionAndTransformation(1);
+ sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+ EasyMock.expectLastCall();
+
+ final Map<TopicPartition, OffsetAndMetadata> workerStartingOffsets = new HashMap<>();
+ workerStartingOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET));
+ workerStartingOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+
+ final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>();
+ workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
+ workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+
+ // iter 3 - note that we return the current offset to indicate they should be committed
+ sinkTask.preCommit(workerCurrentOffsets);
+ EasyMock.expectLastCall().andReturn(workerCurrentOffsets);
+
+ // We need to delay the result of trying to commit offsets to Kafka via the consumer.commitAsync
+ // method. We do this so that we can test that we do not erroneously mark a commit as timed out
+ // while it is still running and under time. To fake this for tests we have the commit run in a
+ // separate thread and wait for a latch which we control back in the main thread.
+ final ExecutorService executor = Executors.newSingleThreadExecutor();
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ consumer.commitAsync(EasyMock.eq(workerCurrentOffsets), EasyMock.<OffsetCommitCallback>anyObject());
+ EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+ @SuppressWarnings("unchecked")
+ @Override
+ public Void answer() throws Throwable {
+ // Grab the arguments passed to the consumer.commitAsync method
+ final Object[] args = EasyMock.getCurrentArguments();
+ final Map<TopicPartition, OffsetAndMetadata> offsets = (Map<TopicPartition, OffsetAndMetadata>) args[0];
+ final OffsetCommitCallback callback = (OffsetCommitCallback) args[1];
+
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ callback.onComplete(offsets, null);
+ }
+ });
+
+ return null;
+ }
+ });
+
+ // no actual consumer.commit() triggered
+ expectConsumerPoll(0);
+
+ sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+ workerTask.iteration(); // iter 1 -- initial assignment
+
+ assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets"));
+ assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
+
+ time.sleep(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT);
+ workerTask.iteration(); // iter 2 -- deliver 2 records
+
+ sinkTaskContext.getValue().requestCommit();
+ workerTask.iteration(); // iter 3 -- commit in progress
+
+ // Make sure the "committing" flag didn't immediately get flipped back to false due to an incorrect timeout
+ assertTrue("Expected worker to be in the process of committing offsets", workerTask.isCommitting());
+
+ // Let the async commit finish and wait for it to end
+ latch.countDown();
+ executor.shutdown();
+ executor.awaitTermination(30, TimeUnit.SECONDS);
+
+ assertEquals(workerCurrentOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets"));
+ assertEquals(workerCurrentOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
+
+ PowerMock.verifyAll();
+ }
+
@Test
public void testMissingTimestampPropagation() throws Exception {
expectInitializeTask();