You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/12/01 23:01:19 UTC

kafka git commit: KAFKA-4161: KIP-89: Allow sink connectors to decouple flush and offset commit

Repository: kafka
Updated Branches:
  refs/heads/trunk b65f9a777 -> b45a67ede


KAFKA-4161: KIP-89: Allow sink connectors to decouple flush and offset commit

Author: Shikhar Bhushan <sh...@confluent.io>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #2139 from shikhar/kafka-4161-deux


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b45a67ed
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b45a67ed
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b45a67ed

Branch: refs/heads/trunk
Commit: b45a67ede9021985c8df87c633b225231092c0c9
Parents: b65f9a7
Author: Shikhar Bhushan <sh...@confluent.io>
Authored: Thu Dec 1 15:01:09 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Dec 1 15:01:09 2016 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/connect/sink/SinkTask.java |  27 ++-
 .../kafka/connect/sink/SinkTaskContext.java     |   9 +
 .../kafka/connect/runtime/WorkerSinkTask.java   |  71 +++++--
 .../connect/runtime/WorkerSinkTaskContext.java  |  15 ++
 .../connect/runtime/WorkerSinkTaskTest.java     | 192 ++++++++++++++++++-
 .../runtime/WorkerSinkTaskThreadedTest.java     |  38 ++--
 6 files changed, 301 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b45a67ed/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
index 3d0becc..99a2683 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
@@ -95,13 +95,30 @@ public abstract class SinkTask implements Task {
     public abstract void put(Collection<SinkRecord> records);
 
     /**
-     * Flush all records that have been {@link #put} for the specified topic-partitions. The
-     * offsets are provided for convenience, but could also be determined by tracking all offsets
-     * included in the SinkRecords passed to {@link #put}.
+     * Flush all records that have been {@link #put(Collection)} for the specified topic-partitions.
      *
-     * @param offsets mapping of TopicPartition to committed offset
+     * @param currentOffsets the current offset state as of the last call to {@link #put(Collection)}},
+     *                       provided for convenience but could also be determined by tracking all offsets included in the {@link SinkRecord}s
+     *                       passed to {@link #put}.
      */
-    public abstract void flush(Map<TopicPartition, OffsetAndMetadata> offsets);
+    public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
+    }
+
+    /**
+     * Pre-commit hook invoked prior to an offset commit.
+     *
+     * The default implementation simply invokes {@link #flush(Map)} and is thus able to assume all {@code currentOffsets} are safe to commit.
+     *
+     * @param currentOffsets the current offset state as of the last call to {@link #put(Collection)}},
+     *                       provided for convenience but could also be determined by tracking all offsets included in the {@link SinkRecord}s
+     *                       passed to {@link #put}.
+     *
+     * @return an empty map if Connect-managed offset commit is not desired, otherwise a map of offsets by topic-partition that are safe to commit.
+     */
+    public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
+        flush(currentOffsets);
+        return currentOffsets;
+    }
 
     /**
      * The SinkTask use this method to create writers for newly assigned partitions in case of partition

http://git-wip-us.apache.org/repos/asf/kafka/blob/b45a67ed/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
index 2202cae..14f13d1 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
@@ -79,4 +79,13 @@ public interface SinkTaskContext {
      * @param partitions the partitions to resume
      */
     void resume(TopicPartition... partitions);
+
+    /**
+     * Request an offset commit. Sink tasks can use this to minimize the potential for redelivery
+     * by requesting an offset commit as soon as they flush data to the destination system.
+     *
+     * It is only a hint to the runtime and no timing guarantee should be assumed.
+     */
+    void requestCommit();
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b45a67ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 1575581..b941469 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -150,19 +150,21 @@ class WorkerSinkTask extends WorkerTask {
     }
 
     protected void iteration() {
+        final long offsetCommitIntervalMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+        final long commitTimeoutMs = commitStarted + workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
+
         try {
             long now = time.milliseconds();
 
             // Maybe commit
-            if (!committing && now >= nextCommit) {
+            if (!committing && (context.isCommitRequested() || now >= nextCommit)) {
                 commitOffsets(now, false);
-                nextCommit += workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+                nextCommit += offsetCommitIntervalMs;
+                context.clearCommitRequest();
             }
 
             // Check for timed out commits
-            long commitTimeout = commitStarted + workerConfig.getLong(
-                    WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
-            if (committing && now >= commitTimeout) {
+            if (committing && now >= commitTimeoutMs) {
                 log.warn("Commit of {} offsets timed out", this);
                 commitFailures++;
                 committing = false;
@@ -267,7 +269,9 @@ class WorkerSinkTask extends WorkerTask {
             OffsetCommitCallback cb = new OffsetCommitCallback() {
                 @Override
                 public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception error) {
-                    lastCommittedOffsets = offsets;
+                    if (error == null) {
+                        lastCommittedOffsets = offsets;
+                    }
                     onCommitCompleted(error, seqno);
                 }
             };
@@ -283,27 +287,58 @@ class WorkerSinkTask extends WorkerTask {
         commitSeqno += 1;
         commitStarted = now;
 
-        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(currentOffsets);
+        final Map<TopicPartition, OffsetAndMetadata> taskProvidedOffsets;
         try {
-            task.flush(offsets);
+            taskProvidedOffsets = task.preCommit(new HashMap<>(currentOffsets));
         } catch (Throwable t) {
-            log.error("Commit of {} offsets failed due to exception while flushing:", this, t);
-            log.error("Rewinding offsets to last committed offsets");
-            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : lastCommittedOffsets.entrySet()) {
-                log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset());
-                consumer.seek(entry.getKey(), entry.getValue().offset());
+            if (closing) {
+                log.warn("{} Offset commit failed during close");
+                onCommitCompleted(t, commitSeqno);
+            } else {
+                log.error("{} Offset commit failed, rewinding to last committed offsets", this, t);
+                for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : lastCommittedOffsets.entrySet()) {
+                    log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset());
+                    consumer.seek(entry.getKey(), entry.getValue().offset());
+                }
+                currentOffsets = new HashMap<>(lastCommittedOffsets);
+                onCommitCompleted(t, commitSeqno);
             }
-            currentOffsets = new HashMap<>(lastCommittedOffsets);
-            onCommitCompleted(t, commitSeqno);
             return;
         } finally {
-            // Close the task if needed before committing the offsets. This is basically the last chance for
-            // the connector to actually flush data that has been written to it.
+            // Close the task if needed before committing the offsets.
             if (closing)
                 task.close(currentOffsets.keySet());
         }
 
-        doCommit(offsets, closing, commitSeqno);
+        if (taskProvidedOffsets.isEmpty()) {
+            log.debug("{} Skipping offset commit, task opted-out", this);
+            onCommitCompleted(null, commitSeqno);
+            return;
+        }
+
+        final Map<TopicPartition, OffsetAndMetadata> commitableOffsets = new HashMap<>(lastCommittedOffsets);
+        for (Map.Entry<TopicPartition, OffsetAndMetadata> taskProvidedOffsetEntry : taskProvidedOffsets.entrySet()) {
+            final TopicPartition partition = taskProvidedOffsetEntry.getKey();
+            final OffsetAndMetadata taskProvidedOffset = taskProvidedOffsetEntry.getValue();
+            if (commitableOffsets.containsKey(partition)) {
+                if (taskProvidedOffset.offset() <= currentOffsets.get(partition).offset()) {
+                    commitableOffsets.put(partition, taskProvidedOffset);
+                } else {
+                    log.warn("Ignoring invalid task provided offset {}/{} -- not yet consumed", partition, taskProvidedOffset);
+                }
+            } else {
+                log.warn("Ignoring invalid task provided offset {}/{} -- partition not assigned", partition, taskProvidedOffset);
+            }
+        }
+
+        if (commitableOffsets.equals(lastCommittedOffsets)) {
+            log.debug("{} Skipping offset commit, no change since last commit", this);
+            onCommitCompleted(null, commitSeqno);
+            return;
+        }
+
+        log.trace("{} Offsets to commit: {}", this, commitableOffsets);
+        doCommit(commitableOffsets, closing, commitSeqno);
     }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b45a67ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
index c762bdd..ede76c4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java
@@ -27,6 +27,7 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
     private long timeoutMs;
     private KafkaConsumer<byte[], byte[]> consumer;
     private final Set<TopicPartition> pausedPartitions;
+    private boolean commitRequested;
 
     public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer) {
         this.offsets = new HashMap<>();
@@ -109,4 +110,18 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
     public Set<TopicPartition> pausedPartitions() {
         return pausedPartitions;
     }
+
+    @Override
+    public void requestCommit() {
+        commitRequested = true;
+    }
+
+    public boolean isCommitRequested() {
+        return commitRequested;
+    }
+
+    public void clearCommitRequest() {
+        commitRequested = false;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b45a67ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index ca218c3..1f9e56b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.record.Record;
@@ -49,6 +50,7 @@ import org.powermock.api.easymock.annotation.Mock;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -62,6 +64,9 @@ import java.util.Set;
 import static java.util.Arrays.asList;
 import static java.util.Collections.singleton;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
@@ -297,15 +302,21 @@ public class WorkerSinkTaskTest {
     @Test
     public void testWakeupInCommitSyncCausesRetry() throws Exception {
         expectInitializeTask();
-        expectPollInitialAssignment();
 
-        final List<TopicPartition> partitions = asList(TOPIC_PARTITION, TOPIC_PARTITION2);
+        expectPollInitialAssignment();
 
-        sinkTask.close(new HashSet<>(partitions));
+        expectConsumerPoll(1);
+        expectConvertMessages(1);
+        sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
         EasyMock.expectLastCall();
 
-        sinkTask.flush(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
-        EasyMock.expectLastCall();
+        final List<TopicPartition> partitions = asList(TOPIC_PARTITION, TOPIC_PARTITION2);
+
+        final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
+        offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+        sinkTask.preCommit(offsets);
+        EasyMock.expectLastCall().andReturn(offsets);
 
         // first one raises wakeup
         consumer.commitSync(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
@@ -315,6 +326,9 @@ public class WorkerSinkTaskTest {
         consumer.commitSync(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
         EasyMock.expectLastCall();
 
+        sinkTask.close(new HashSet<>(partitions));
+        EasyMock.expectLastCall();
+
         EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
         EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
 
@@ -344,12 +358,173 @@ public class WorkerSinkTaskTest {
         workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
         workerTask.iteration(); // poll for initial assignment
+        workerTask.iteration(); // first record delivered
         workerTask.iteration(); // now rebalance with the wakeup triggered
 
         PowerMock.verifyAll();
     }
 
     @Test
+    public void testRequestCommit() throws Exception {
+        expectInitializeTask();
+
+        expectPollInitialAssignment();
+
+        expectConsumerPoll(1);
+        expectConvertMessages(1);
+        sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+        EasyMock.expectLastCall();
+
+        final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
+        offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+        sinkTask.preCommit(offsets);
+        EasyMock.expectLastCall().andReturn(offsets);
+
+        final Capture<OffsetCommitCallback> callback = EasyMock.newCapture();
+        consumer.commitAsync(EasyMock.eq(offsets), EasyMock.capture(callback));
+        EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+            @Override
+            public Void answer() throws Throwable {
+                callback.getValue().onComplete(offsets, null);
+                return null;
+            }
+        });
+
+        expectConsumerPoll(0);
+        sinkTask.put(Collections.<SinkRecord>emptyList());
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+
+        workerTask.iteration(); // initial assignment
+
+        workerTask.iteration(); // first record delivered
+
+        sinkTaskContext.getValue().requestCommit();
+        assertTrue(sinkTaskContext.getValue().isCommitRequested());
+        assertNotEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
+        workerTask.iteration(); // triggers the commit
+        assertFalse(sinkTaskContext.getValue().isCommitRequested()); // should have been cleared
+        assertEquals(offsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
+        assertEquals(0, workerTask.commitFailures());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPreCommit() throws Exception {
+        expectInitializeTask();
+
+        // iter 1
+        expectPollInitialAssignment();
+
+        // iter 2
+        expectConsumerPoll(2);
+        expectConvertMessages(2);
+        sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+        EasyMock.expectLastCall();
+
+        final Map<TopicPartition, OffsetAndMetadata> workerStartingOffsets = new HashMap<>();
+        workerStartingOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET));
+        workerStartingOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+
+        final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>();
+        workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2));
+        workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+
+        final Map<TopicPartition, OffsetAndMetadata> taskOffsets = new HashMap<>();
+        taskOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1)); // act like FIRST_OFFSET+2 has not yet been flushed by the task
+        taskOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET + 1)); // should be ignored because > current offset
+        taskOffsets.put(new TopicPartition(TOPIC, 3), new OffsetAndMetadata(FIRST_OFFSET)); // should be ignored because this partition is not assigned
+
+        final Map<TopicPartition, OffsetAndMetadata> committableOffsets = new HashMap<>();
+        committableOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
+        committableOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+
+        sinkTask.preCommit(workerCurrentOffsets);
+        EasyMock.expectLastCall().andReturn(taskOffsets);
+        final Capture<OffsetCommitCallback> callback = EasyMock.newCapture();
+        consumer.commitAsync(EasyMock.eq(committableOffsets), EasyMock.capture(callback));
+        EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+            @Override
+            public Void answer() throws Throwable {
+                callback.getValue().onComplete(committableOffsets, null);
+                return null;
+            }
+        });
+        expectConsumerPoll(0);
+        sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        workerTask.iteration(); // iter 1 -- initial assignment
+
+        assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets"));
+        workerTask.iteration(); // iter 2 -- deliver 2 records
+
+        assertEquals(workerCurrentOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets"));
+        assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
+        sinkTaskContext.getValue().requestCommit();
+        workerTask.iteration(); // iter 3 -- commit
+        assertEquals(committableOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testIgnoredCommit() throws Exception {
+        expectInitializeTask();
+
+        // iter 1
+        expectPollInitialAssignment();
+
+        // iter 2
+        expectConsumerPoll(1);
+        expectConvertMessages(1);
+        sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+        EasyMock.expectLastCall();
+
+        final Map<TopicPartition, OffsetAndMetadata> workerStartingOffsets = new HashMap<>();
+        workerStartingOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET));
+        workerStartingOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+
+        final Map<TopicPartition, OffsetAndMetadata> workerCurrentOffsets = new HashMap<>();
+        workerCurrentOffsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 1));
+        workerCurrentOffsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+
+        // iter 3
+        sinkTask.preCommit(workerCurrentOffsets);
+        EasyMock.expectLastCall().andReturn(workerStartingOffsets);
+        // no actual consumer.commit() triggered
+        expectConsumerPoll(0);
+        sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        workerTask.iteration(); // iter 1 -- initial assignment
+
+        assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "currentOffsets"));
+        assertEquals(workerStartingOffsets, Whitebox.<Map<TopicPartition, OffsetAndMetadata>>getInternalState(workerTask, "lastCommittedOffsets"));
+
+        workerTask.iteration(); // iter 2 -- deliver 2 records
+
+        sinkTaskContext.getValue().requestCommit();
+        workerTask.iteration(); // iter 3 -- commit
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testMissingTimestampPropagation() throws Exception {
         expectInitializeTask();
         expectConsumerPoll(1, Record.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE);
@@ -434,11 +609,8 @@ public class WorkerSinkTaskTest {
         sinkTask.close(new HashSet<>(partitions));
         EasyMock.expectLastCall();
 
-        sinkTask.flush(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
-        EasyMock.expectLastCall();
-
-        consumer.commitSync(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
-        EasyMock.expectLastCall();
+        sinkTask.preCommit(EasyMock.<Map<TopicPartition, OffsetAndMetadata>>anyObject());
+        EasyMock.expectLastCall().andReturn(Collections.emptyMap());
 
         EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
         EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b45a67ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 52a86ab..8fa62b6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -181,7 +181,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         // Make each poll() take the offset commit interval
         Capture<Collection<SinkRecord>> capturedRecords
                 = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
-        expectOffsetFlush(1L, null, null, 0, true);
+        expectOffsetCommit(1L, null, null, 0, true);
         expectStopTask();
 
         PowerMock.replayAll();
@@ -207,12 +207,12 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
     }
 
     @Test
-    public void testCommitTaskFlushFailure() throws Exception {
+    public void testCommitFailure() throws Exception {
         expectInitializeTask();
         expectPollInitialAssignment();
 
         Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
-        expectOffsetFlush(1L, new RuntimeException(), null, 0, true);
+        expectOffsetCommit(1L, new RuntimeException(), null, 0, true);
         // Should rewind to last known good positions, which in this case will be the offsets loaded during initialization
         // for all topic partitions
         consumer.seek(TOPIC_PARTITION, FIRST_OFFSET);
@@ -244,14 +244,14 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
     }
 
     @Test
-    public void testCommitTaskSuccessAndFlushFailure() throws Exception {
-        // Validate that we rewind to the correct offsets if a task's flush method throws an exception
+    public void testCommitSuccessFollowedByFailure() throws Exception {
+        // Validate that we rewind to the correct offsets if a task's preCommit() method throws an exception
 
         expectInitializeTask();
         expectPollInitialAssignment();
         Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
-        expectOffsetFlush(1L, null, null, 0, true);
-        expectOffsetFlush(2L, new RuntimeException(), null, 0, true);
+        expectOffsetCommit(1L, null, null, 0, true);
+        expectOffsetCommit(2L, new RuntimeException(), null, 0, true);
         // Should rewind to last known committed positions
         consumer.seek(TOPIC_PARTITION, FIRST_OFFSET + 1);
         PowerMock.expectLastCall();
@@ -290,7 +290,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
 
         Capture<Collection<SinkRecord>> capturedRecords
                 = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
-        expectOffsetFlush(1L, null, new Exception(), 0, true);
+        expectOffsetCommit(1L, null, new Exception(), 0, true);
         expectStopTask();
 
         PowerMock.replayAll();
@@ -322,7 +322,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         // Cut down amount of time to pass in each poll so we trigger exactly 1 offset commit
         Capture<Collection<SinkRecord>> capturedRecords
                 = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT / 2);
-        expectOffsetFlush(2L, null, null, WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false);
+        expectOffsetCommit(2L, null, null, WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false);
         expectStopTask();
 
         PowerMock.replayAll();
@@ -633,11 +633,11 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         return EasyMock.expectLastCall();
     }
 
-    private Capture<OffsetCommitCallback> expectOffsetFlush(final long expectedMessages,
-                                                            final RuntimeException flushError,
-                                                            final Exception consumerCommitError,
-                                                            final long consumerCommitDelayMs,
-                                                            final boolean invokeCallback)
+    private Capture<OffsetCommitCallback> expectOffsetCommit(final long expectedMessages,
+                                                             final RuntimeException error,
+                                                             final Exception consumerCommitError,
+                                                             final long consumerCommitDelayMs,
+                                                             final boolean invokeCallback)
             throws Exception {
         final long finalOffset = FIRST_OFFSET + expectedMessages;
 
@@ -646,11 +646,13 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         offsetsToCommit.put(TOPIC_PARTITION, new OffsetAndMetadata(finalOffset));
         offsetsToCommit.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
         offsetsToCommit.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET));
-        sinkTask.flush(offsetsToCommit);
-        IExpectationSetters<Object> flushExpectation = PowerMock.expectLastCall();
-        if (flushError != null) {
-            flushExpectation.andThrow(flushError).once();
+        sinkTask.preCommit(offsetsToCommit);
+        IExpectationSetters<Object> expectation = PowerMock.expectLastCall();
+        if (error != null) {
+            expectation.andThrow(error).once();
             return null;
+        } else {
+            expectation.andReturn(offsetsToCommit);
         }
 
         final Capture<OffsetCommitCallback> capturedCallback = EasyMock.newCapture();