You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/12/01 23:01:19 UTC
kafka git commit: KAFKA-4161: KIP-89: Allow sink connectors to
decouple flush and offset commit
Repository: kafka
Updated Branches:
refs/heads/trunk b65f9a777 -> b45a67ede
KAFKA-4161: KIP-89: Allow sink connectors to decouple flush and offset commit
Author: Shikhar Bhushan <sh...@confluent.io>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #2139 from shikhar/kafka-4161-deux
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b45a67ed
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b45a67ed
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b45a67ed
Branch: refs/heads/trunk
Commit: b45a67ede9021985c8df87c633b225231092c0c9
Parents: b65f9a7
Author: Shikhar Bhushan <sh...@confluent.io>
Authored: Thu Dec 1 15:01:09 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Dec 1 15:01:09 2016 -0800
----------------------------------------------------------------------
.../org/apache/kafka/connect/sink/SinkTask.java | 27 ++-
.../kafka/connect/sink/SinkTaskContext.java | 9 +
.../kafka/connect/runtime/WorkerSinkTask.java | 71 +++++--
.../connect/runtime/WorkerSinkTaskContext.java | 15 ++
.../connect/runtime/WorkerSinkTaskTest.java | 192 ++++++++++++++++++-
.../runtime/WorkerSinkTaskThreadedTest.java | 38 ++--
6 files changed, 301 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b45a67ed/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
index 3d0becc..99a2683 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
@@ -95,13 +95,30 @@ public abstract class SinkTask implements Task {
public abstract void put(Collection<SinkRecord> records);
/**
- * Flush all records that have been {@link #put} for the specified topic-partitions. The
- * offsets are provided for convenience, but could also be determined by tracking all offsets
- * included in the SinkRecords passed to {@link #put}.
+ * Flush all records that have been {@link #put(Collection)} for the specified topic-partitions.
*
- * @param offsets mapping of TopicPartition to committed offset
+ * @param currentOffsets the current offset state as of the last call to {@link #put(Collection)}},
+ * provided for convenience but could also be determined by tracking all offsets included in the {@link SinkRecord}s
+ * passed to {@link #put}.
*/
- public abstract void flush(Map<TopicPartition, OffsetAndMetadata> offsets);
+ public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
+ }
+
+ /**
+ * Pre-commit hook invoked prior to an offset commit.
+ *
+ * The default implementation simply invokes {@link #flush(Map)} and is thus able to assume all {@code currentOffsets} are safe to commit.
+ *
+ * @param currentOffsets the current offset state as of the last call to {@link #put(Collection)}},
+ * provided for convenience but could also be determined by tracking all offsets included in the {@link SinkRecord}s
+ * passed to {@link #put}.
+ *
+ * @return an empty map if Connect-managed offset commit is not desired, otherwise a map of offsets by topic-partition that are safe to commit.
+ */
+ public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
+ flush(currentOffsets);
+ return currentOffsets;
+ }
/**
* The SinkTask use this method to create writers for newly assigned partitions in case of partition
http://git-wip-us.apache.org/repos/asf/kafka/blob/b45a67ed/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
index 2202cae..14f13d1 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
@@ -79,4 +79,13 @@ public interface SinkTaskContext {
* @param partitions the partitions to resume
*/
void resume(TopicPartition... partitions);
+
+ /**
+ * Request an offset commit. Sink tasks can use this to minimize the potential for redelivery
+ * by requesting an offset commit as soon as they flush data to the destination system.
+ *
+ * It is only a hint to the runtime and no timing guarantee should be assumed.
+ */
+ void requestCommit();
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b45a67ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 1575581..b941469 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -150,19 +150,21 @@ class WorkerSinkTask extends WorkerTask {
}
protected void iteration() {
+ final long offsetCommitIntervalMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+ final long commitTimeoutMs = commitStarted + workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
+
try {
long now = time.milliseconds();
// Maybe commit
- if (!committing && now >= nextCommit) {
+ if (!committing && (context.isCommitRequested() || now >= nextCommit)) {
commitOffsets(now, false);
- nextCommit += workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+ nextCommit += offsetCommitIntervalMs;
+ context.clearCommitRequest();
}
// Check for timed out commits
- long commitTimeout = commitStarted + workerConfig.getLong(
- WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
- if (committing && now >= commitTimeout) {
+ if (committing && now >= commitTimeoutMs) {
log.warn("Commit of {} offsets timed out", this);
commitFailures++;
committing = false;
@@ -267,7 +269,9 @@ class WorkerSinkTask extends WorkerTask {
OffsetCommitCallback cb = new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception error) {
- lastCommittedOffsets = offsets;
+ if (error == null) {
+ lastCommittedOffsets = offsets;
+ }
onCommitCompleted(error, seqno);
}
};
@@ -283,27 +287,58 @@ class WorkerSinkTask extends WorkerTask {
commitSeqno += 1;
commitStarted = now;
- Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(currentOffsets);
+ final Map<TopicPartition, OffsetAndMetadata> taskProvidedOffsets;
try {
- task.flush(offsets);
+ taskProvidedOffsets = task.preCommit(new HashMap<>(currentOffsets));
} catch (Throwable t) {
- log.error("Commit of {} offsets failed due to exception while flushing:", this, t);
- log.error("Rewinding offsets to last committed offsets");
- for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : lastCommittedOffsets.entrySet()) {
- log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset());
- consumer.seek(entry.getKey(), entry.getValue().offset());
+ if (closing) {
+ log.warn("{} Offset commit failed during close");
+ onCommitCompleted(t, commitSeqno);
+ } else {
+ log.error("{} Offset commit failed, rewinding to last committed offsets", this, t);
+ for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : lastCommittedOffsets.entrySet()) {
+ log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset());
+ consumer.seek(entry.getKey(), entry.getValue().offset());
+ }
+ currentOffsets = new HashMap<>(lastCommittedOffsets);
+ onCommitCompleted(t, commitSeqno);
}
- currentOffsets = new HashMap<>(lastCommittedOffsets);
- onCommitCompleted(t, commitSeqno);
return;
} finally {
- // Close the task if needed before committing the offsets. This is basically the last chance for
- // the connector to actually flush data that has been written to it.
+ // Close the task if needed before committing the offsets.
if (closing)
task.close(currentOffsets.keySet());
}
- doCommit(offsets, closing, commitSeqno);
+ if (taskProvidedOffsets.isEmpty()) {
+ log.debug("{} Skipping offset commit, task opted-out", this);
+ onCommitCompleted(null, commitSeqno);
+ return;
+ }
+
+ final Map<TopicPartition, OffsetAndMetadata> commitableOffsets = new HashMap<>(lastCommittedOffsets);
+ for (Map.Entry<TopicPartition, OffsetAndMetadata> taskProvidedOffsetEntry : taskProvidedOffsets.entrySet()) {
+ final TopicPartition partition = taskProvidedOffsetEntry.getKey();
+ final OffsetAndMetadata taskProvidedOffset = taskProvidedOffsetEntry.getValue();
+ if (commitableOffsets.containsKey(partition)) {
+ if (taskProvidedOffset.offset() <= currentOffsets.get(partition).offset()) {
+ commitableOffsets.put(partition, taskProvidedOffset);
+ } else {
+ log.warn("Ignoring invalid task provided offset {}/{} -- not yet consumed", partition, taskProvidedOffset);
+ }
+ } else {
+ log.warn("Ignoring invalid task provided offset {}/{} -- partition not assigned", partition, taskProvidedOffset);
+ }
+ }
+
+ if (commitableOffsets.equals(lastCommittedOffsets)) {
+ log.debug("{} Skipping offset commit, no change since last commit", this);
+ onCommitCompleted(null, commitSeqno);
+ return;
+ }
+
+ log.trace("{} Offsets to commit: {}", this, commitableOffsets);
+ doCommit(commitableOffsets, closing, commitSeqno);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b45a67ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
index c762bdd..ede76c4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
@@ -27,6 +27,7 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
private long timeoutMs;
private KafkaConsumer<byte[], byte[]> consumer;
private final Set<TopicPartition> pausedPartitions;
+ private boolean commitRequested;
public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer) {
this.offsets = new HashMap<>();
@@ -109,4 +110,18 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
public Set<TopicPartition> pausedPartitions() {
return pausedPartitions;
}
+
+ @Override
+ public void requestCommit() {
+ commitRequested = true;
+ }
+
+ public boolean isCommitRequested() {
+ return commitRequested;
+ }
+
+ public void clearCommitRequest() {
+ commitRequested = false;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b45a67ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index ca218c3..1f9e56b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.record.Record;
@@ -49,6 +50,7 @@ import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
import java.util.ArrayList;
import java.util.Collection;
@@ -62,6 +64,9 @@ import java.util.Set;
import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class)
@@ -297,15 +302,21 @@ public class WorkerSinkTaskTest {
@Test
public void testWakeupInCommitSyncCausesRetry() throws Exception {
expectInitializeTask();
- expectPollInitialAssignment();
- final List<TopicPartition> partitions = asList(TOPIC_PARTITION, TOPIC_PARTITION2);
+ expectPollInitialAssignment();
- sinkTask.close(new HashSet<>(partitions));
+ expectConsumerPoll(1);
+ expectConvertMessages(1);
+ sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
EasyMock.expectLastCall();
- sinkTask.flush(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
- EasyMock.expectLastCall();
+ final List<TopicPartition> partitions = asList(TOPIC_PARTITION, TOPIC_PARTITION2);
+
+ final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
+ offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+ sinkTask.preCommit(offsets);
+ EasyMock.expectLastCall().andReturn(offsets);
// first one raises wakeup
consumer.commitSync(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
@@ -315,6 +326,9 @@ public class WorkerSinkTaskTest {
consumer.commitSync(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
EasyMock.expectLastCall();
+ sinkTask.close(new HashSet<>(partitions));
+ EasyMock.expectLastCall();
+
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
@@ -344,12 +358,173 @@ public class WorkerSinkTaskTest {
workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration(); // poll for initial assignment
+ workerTask.iteration(); // first record delivered
workerTask.iteration(); // now rebalance with the wakeup triggered
PowerMock.verifyAll();
}
@Test
+ public void testRequestCommit() throws Exception {
+ expectInitializeTask();
+
+ expectPollInitialAssignment();
+
+ expectConsumerPoll(1);
+ expectConvertMessages(1);
+ sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+ EasyMock.expectLastCall();
+
+ final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
+ offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+ sinkTask.preCommit(offsets);
+ EasyMock.expectLastCall().andReturn(offsets);
+
+ final Capture<OffsetCommitCallback> callback = EasyMock.newCapture();
+ consumer.commitAsync(EasyMock.eq(offsets), EasyMock.capture(callback));
+ EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+ @Override
+ public Void answer() throws Throwable {
+ callback.getValue().onComplete(offsets, null);
+ return null;
+ }
+ });
+
+ expectConsumerPoll(0);
+ sinkTask.put(Collections.<SinkRecord>emptyList());
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+
+ workerTask.iteration(); // initial assignment
+
+ workerTask.iteration(); // first record delivered
+
+ sinkTaskContext.getValue().requestCommit();
+ assertTrue(sinkTaskContext.getValue().isCommitRequested());
+ assertNotEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
+ workerTask.iteration(); // triggers the commit
+ assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been cleared
+ assertEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
+ assertEquals(0, workerTask.commitFailures());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testPreCommit() throws Exception {
+ expectInitializeTask();
+
+ // iter 1
+ expectPollInitialAssignment();
+
+ // iter 2
+ expectConsumerPoll(2);
+ expectConvertMessages(2);
+ sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+ EasyMock.expectLastCall();
+
+ final Map<TopicPartition, OffsetAndMetadata> workerStartingOffsets = new HashMap<>();
+ workerStartingOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET));
+ workerStartingOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+
+ final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>();
+ workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2));
+ workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+
+ final Map<TopicPartition, OffsetAndMetadata> taskOffsets = new HashMap<>();
+ taskOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); // act like FIRST_OFFSET+2 has not yet been flushed by the task
+ taskOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET + 1)); // should be ignored because > current offset
+ taskOffsets.put(new TopicPartition(TOPIC, 3), new OffsetAndMetadata(FIRST_OFFSET)); // should be ignored because this partition is not assigned
+
+ final Map<TopicPartition, OffsetAndMetadata> committableOffsets = new HashMap<>();
+ committableOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
+ committableOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+
+ sinkTask.preCommit(workerCurrentOffsets);
+ EasyMock.expectLastCall().andReturn(taskOffsets);
+ final Capture<OffsetCommitCallback> callback = EasyMock.newCapture();
+ consumer.commitAsync(EasyMock.eq(committableOffsets), EasyMock.capture(callback));
+ EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+ @Override
+ public Void answer() throws Throwable {
+ callback.getValue().onComplete(committableOffsets, null);
+ return null;
+ }
+ });
+ expectConsumerPoll(0);
+ sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+ workerTask.iteration(); // iter 1 -- initial assignment
+
+ assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets"));
+ workerTask.iteration(); // iter 2 -- deliver 2 records
+
+ assertEquals(workerCurrentOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets"));
+ assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
+ sinkTaskContext.getValue().requestCommit();
+ workerTask.iteration(); // iter 3 -- commit
+ assertEquals(committableOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testIgnoredCommit() throws Exception {
+ expectInitializeTask();
+
+ // iter 1
+ expectPollInitialAssignment();
+
+ // iter 2
+ expectConsumerPoll(1);
+ expectConvertMessages(1);
+ sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+ EasyMock.expectLastCall();
+
+ final Map<TopicPartition, OffsetAndMetadata> workerStartingOffsets = new HashMap<>();
+ workerStartingOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET));
+ workerStartingOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+
+ final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>();
+ workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
+ workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+
+ // iter 3
+ sinkTask.preCommit(workerCurrentOffsets);
+ EasyMock.expectLastCall().andReturn(workerStartingOffsets);
+ // no actual consumer.commit() triggered
+ expectConsumerPoll(0);
+ sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+ workerTask.iteration(); // iter 1 -- initial assignment
+
+ assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets"));
+ assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
+
+ workerTask.iteration(); // iter 2 -- deliver 2 records
+
+ sinkTaskContext.getValue().requestCommit();
+ workerTask.iteration(); // iter 3 -- commit
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
public void testMissingTimestampPropagation() throws Exception {
expectInitializeTask();
expectConsumerPoll(1, Record.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE);
@@ -434,11 +609,8 @@ public class WorkerSinkTaskTest {
sinkTask.close(new HashSet<>(partitions));
EasyMock.expectLastCall();
- sinkTask.flush(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
- EasyMock.expectLastCall();
-
- consumer.commitSync(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
- EasyMock.expectLastCall();
+ sinkTask.preCommit(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
+ EasyMock.expectLastCall().andReturn(Collections.emptyMap());
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
http://git-wip-us.apache.org/repos/asf/kafka/blob/b45a67ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 52a86ab..8fa62b6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -181,7 +181,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
// Make each poll() take the offset commit interval
Capture<Collection<SinkRecord>> capturedRecords
= expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
- expectOffsetFlush(1L, null, null, 0, true);
+ expectOffsetCommit(1L, null, null, 0, true);
expectStopTask();
PowerMock.replayAll();
@@ -207,12 +207,12 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
}
@Test
- public void testCommitTaskFlushFailure() throws Exception {
+ public void testCommitFailure() throws Exception {
expectInitializeTask();
expectPollInitialAssignment();
Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
- expectOffsetFlush(1L, new RuntimeException(), null, 0, true);
+ expectOffsetCommit(1L, new RuntimeException(), null, 0, true);
// Should rewind to last known good positions, which in this case will be the offsets loaded during initialization
// for all topic partitions
consumer.seek(TOPIC_PARTITION, FIRST_OFFSET);
@@ -244,14 +244,14 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
}
@Test
- public void testCommitTaskSuccessAndFlushFailure() throws Exception {
- // Validate that we rewind to the correct offsets if a task's flush method throws an exception
+ public void testCommitSuccessFollowedByFailure() throws Exception {
+ // Validate that we rewind to the correct offsets if a task's preCommit() method throws an exception
expectInitializeTask();
expectPollInitialAssignment();
Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
- expectOffsetFlush(1L, null, null, 0, true);
- expectOffsetFlush(2L, new RuntimeException(), null, 0, true);
+ expectOffsetCommit(1L, null, null, 0, true);
+ expectOffsetCommit(2L, new RuntimeException(), null, 0, true);
// Should rewind to last known committed positions
consumer.seek(TOPIC_PARTITION, FIRST_OFFSET + 1);
PowerMock.expectLastCall();
@@ -290,7 +290,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
Capture<Collection<SinkRecord>> capturedRecords
= expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
- expectOffsetFlush(1L, null, new Exception(), 0, true);
+ expectOffsetCommit(1L, null, new Exception(), 0, true);
expectStopTask();
PowerMock.replayAll();
@@ -322,7 +322,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
// Cut down amount of time to pass in each poll so we trigger exactly 1 offset commit
Capture<Collection<SinkRecord>> capturedRecords
= expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT / 2);
- expectOffsetFlush(2L, null, null, WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false);
+ expectOffsetCommit(2L, null, null, WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false);
expectStopTask();
PowerMock.replayAll();
@@ -633,11 +633,11 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
return EasyMock.expectLastCall();
}
- private Capture<OffsetCommitCallback> expectOffsetFlush(final long expectedMessages,
- final RuntimeException flushError,
- final Exception consumerCommitError,
- final long consumerCommitDelayMs,
- final boolean invokeCallback)
+ private Capture<OffsetCommitCallback> expectOffsetCommit(final long expectedMessages,
+ final RuntimeException error,
+ final Exception consumerCommitError,
+ final long consumerCommitDelayMs,
+ final boolean invokeCallback)
throws Exception {
final long finalOffset = FIRST_OFFSET + expectedMessages;
@@ -646,11 +646,13 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
offsetsToCommit.put(TOPIC_PARTITION, new OffsetAndMetadata(finalOffset));
offsetsToCommit.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
offsetsToCommit.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET));
- sinkTask.flush(offsetsToCommit);
- IExpectationSetters<Object> flushExpectation = PowerMock.expectLastCall();
- if (flushError != null) {
- flushExpectation.andThrow(flushError).once();
+ sinkTask.preCommit(offsetsToCommit);
+ IExpectationSetters<Object> expectation = PowerMock.expectLastCall();
+ if (error != null) {
+ expectation.andThrow(error).once();
return null;
+ } else {
+ expectation.andReturn(offsetsToCommit);
}
final Capture<OffsetCommitCallback> capturedCallback = EasyMock.newCapture();