You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/09 05:41:47 UTC

kafka git commit: KAFKA-2480: Handle retriable and non-retriable exceptions thrown by sink tasks.

Repository: kafka
Updated Branches:
  refs/heads/trunk 83fb73460 -> f4b87deef


KAFKA-2480: Handle retriable and non-retriable exceptions thrown by sink tasks.

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Gwen Shapira

Closes #450 from ewencp/kafka-2480-unrecoverable-task-errors


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f4b87dee
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f4b87dee
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f4b87dee

Branch: refs/heads/trunk
Commit: f4b87deefecf4902992a84d4a3fe3b99a94ff72b
Parents: 83fb734
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Sun Nov 8 20:41:35 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Sun Nov 8 20:41:35 2015 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/copycat/sink/SinkTask.java |   5 +
 .../copycat/errors/RetriableException.java      |  35 ++
 .../kafka/copycat/runtime/WorkerSinkTask.java   | 116 +++-
 .../copycat/runtime/WorkerSinkTaskContext.java  |  11 +
 .../kafka/copycat/runtime/WorkerSourceTask.java |   7 +
 .../copycat/runtime/WorkerSinkTaskTest.java     | 496 ++--------------
 .../runtime/WorkerSinkTaskThreadedTest.java     | 563 +++++++++++++++++++
 7 files changed, 763 insertions(+), 470 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f4b87dee/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
index b2d5ff6..90651ed 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
@@ -57,6 +57,11 @@ public abstract class SinkTask implements Task {
      * Put the records in the sink. Usually this should send the records to the sink asynchronously
      * and immediately return.
      *
+     * If this operation fails, the SinkTask may throw a {@link org.apache.kafka.copycat.errors.RetriableException} to
+     * indicate that the framework should attempt to retry the same call again. Other exceptions will cause the task to
+     * be stopped immediately. {@link SinkTaskContext#timeout(long)} can be used to set the maximum time before the
+     * batch will be retried.
+     *
      * @param records the set of records to send
      */
     public abstract void put(Collection<SinkRecord> records);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4b87dee/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/RetriableException.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/RetriableException.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/RetriableException.java
new file mode 100644
index 0000000..75821aa
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/RetriableException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.errors;
+
+/**
+ * An exception that indicates the operation can be reattempted.
+ */
+public class RetriableException extends CopycatException {
+    public RetriableException(String s) {
+        super(s);
+    }
+
+    public RetriableException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public RetriableException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4b87dee/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
index e9193b8..ad6d872 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.errors.RetriableException;
 import org.apache.kafka.copycat.sink.SinkRecord;
 import org.apache.kafka.copycat.sink.SinkTask;
 import org.apache.kafka.copycat.storage.Converter;
@@ -42,8 +43,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -63,7 +67,10 @@ class WorkerSinkTask implements WorkerTask {
     private KafkaConsumer<byte[], byte[]> consumer;
     private WorkerSinkTaskContext context;
     private boolean started;
+    private final List<SinkRecord> messageBatch;
     private Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets;
+    private Map<TopicPartition, OffsetAndMetadata> currentOffsets;
+    private boolean pausedForRedelivery;
 
     public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig,
                           Converter keyConverter, Converter valueConverter, Time time) {
@@ -74,6 +81,9 @@ class WorkerSinkTask implements WorkerTask {
         this.valueConverter = valueConverter;
         this.time = time;
         this.started = false;
+        this.messageBatch = new ArrayList<>();
+        this.currentOffsets = new HashMap<>();
+        this.pausedForRedelivery = false;
     }
 
     @Override
@@ -156,10 +166,14 @@ class WorkerSinkTask implements WorkerTask {
                 timeoutMs = Math.min(timeoutMs, retryTimeout);
                 context.timeout(-1L);
             }
+
             log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
             ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs);
+            assert messageBatch.isEmpty() || msgs.isEmpty();
             log.trace("{} polling returned {} messages", id, msgs.count());
-            deliverMessages(msgs);
+
+            convertMessages(msgs);
+            deliverMessages();
         } catch (WakeupException we) {
             log.trace("{} consumer woken up", id);
         }
@@ -171,12 +185,8 @@ class WorkerSinkTask implements WorkerTask {
      **/
     public void commitOffsets(boolean sync, final int seqno) {
         log.info("{} Committing offsets", this);
-        final HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
-        for (TopicPartition tp : consumer.assignment()) {
-            long pos = consumer.position(tp);
-            offsets.put(tp, new OffsetAndMetadata(pos));
-            log.debug("{} committing {} offset {}", id, tp, pos);
-        }
+
+        final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(currentOffsets);
 
         try {
             task.flush(offsets);
@@ -187,6 +197,7 @@ class WorkerSinkTask implements WorkerTask {
                 log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset());
                 consumer.seek(entry.getKey(), entry.getValue().offset());
             }
+            currentOffsets = new HashMap<>(lastCommittedOffsets);
             workThread.onCommitCompleted(t, seqno);
             return;
         }
@@ -195,6 +206,7 @@ class WorkerSinkTask implements WorkerTask {
             try {
                 consumer.commitSync(offsets);
                 lastCommittedOffsets = offsets;
+                workThread.onCommitCompleted(null, seqno);
             } catch (KafkaException e) {
                 workThread.onCommitCompleted(e, seqno);
             }
@@ -244,29 +256,49 @@ class WorkerSinkTask implements WorkerTask {
         return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig);
     }
 
-    private void deliverMessages(ConsumerRecords<byte[], byte[]> msgs) {
-        // Finally, deliver this batch to the sink
-        if (msgs.count() > 0) {
-            List<SinkRecord> records = new ArrayList<>();
-            for (ConsumerRecord<byte[], byte[]> msg : msgs) {
-                log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
-                SchemaAndValue keyAndSchema = keyConverter.toCopycatData(msg.topic(), msg.key());
-                SchemaAndValue valueAndSchema = valueConverter.toCopycatData(msg.topic(), msg.value());
-                records.add(
-                        new SinkRecord(msg.topic(), msg.partition(),
-                                keyAndSchema.schema(), keyAndSchema.value(),
-                                valueAndSchema.schema(), valueAndSchema.value(),
-                                msg.offset())
-                );
-            }
+    private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) {
+        for (ConsumerRecord<byte[], byte[]> msg : msgs) {
+            log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
+            SchemaAndValue keyAndSchema = keyConverter.toCopycatData(msg.topic(), msg.key());
+            SchemaAndValue valueAndSchema = valueConverter.toCopycatData(msg.topic(), msg.value());
+            messageBatch.add(
+                    new SinkRecord(msg.topic(), msg.partition(),
+                            keyAndSchema.schema(), keyAndSchema.value(),
+                            valueAndSchema.schema(), valueAndSchema.value(),
+                            msg.offset())
+            );
+        }
+    }
 
-            try {
-                task.put(records);
-            } catch (CopycatException e) {
-                log.error("Exception from SinkTask {}: ", id, e);
-            } catch (Throwable t) {
-                log.error("Unexpected exception from SinkTask {}: ", id, t);
+    private void deliverMessages() {
+        // Finally, deliver this batch to the sink
+        try {
+            // Since we reuse the messageBatch buffer, ensure we give the task its own copy
+            task.put(new ArrayList<>(messageBatch));
+            for (SinkRecord record : messageBatch)
+                currentOffsets.put(new TopicPartition(record.topic(), record.kafkaPartition()),
+                        new OffsetAndMetadata(record.kafkaOffset() + 1));
+            messageBatch.clear();
+            // If we had paused all consumer topic partitions to try to redeliver data, then we should resume any that
+            // the task had not explicitly paused
+            if (pausedForRedelivery) {
+                for (TopicPartition tp : consumer.assignment())
+                    if (!context.pausedPartitions().contains(tp))
+                        consumer.resume(tp);
+                pausedForRedelivery = false;
             }
+        } catch (RetriableException e) {
+            log.error("RetriableException from SinkTask {}: {}", id, e);
+            // If we're retrying a previous batch, make sure we've paused all topic partitions so we don't get new data,
+            // but will still be able to poll in order to handle user-requested timeouts, keep group membership, etc.
+            pausedForRedelivery = true;
+            for (TopicPartition tp : consumer.assignment())
+                consumer.pause(tp);
+            // Let this exit normally, the batch will be reprocessed on the next loop.
+        } catch (Throwable t) {
+            log.error("Task {} threw an uncaught and unrecoverable exception", id);
+            log.error("Task is being killed and will not recover until manually restarted:", t);
+            throw new CopycatException("Exiting WorkerSinkTask due to unrecoverable exception.");
         }
     }
 
@@ -280,6 +312,8 @@ class WorkerSinkTask implements WorkerTask {
             if (offset != null) {
                 log.trace("Rewind {} to offset {}.", tp, offset);
                 consumer.seek(tp, offset);
+                lastCommittedOffsets.put(tp, new OffsetAndMetadata(offset));
+                currentOffsets.put(tp, new OffsetAndMetadata(offset));
             }
         }
         context.clearOffsets();
@@ -289,11 +323,35 @@ class WorkerSinkTask implements WorkerTask {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
             lastCommittedOffsets = new HashMap<>();
+            currentOffsets = new HashMap<>();
             for (TopicPartition tp : partitions) {
                 long pos = consumer.position(tp);
                 lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
+                currentOffsets.put(tp, new OffsetAndMetadata(pos));
                 log.debug("{} assigned topic partition {} with offset {}", id, tp, pos);
             }
+
+            // If we paused everything for redelivery (which is no longer relevant since we discarded the data), make
+            // sure anything we paused that the task didn't request to be paused *and* which we still own is resumed.
+            // Also make sure our tracking of paused partitions is updated to remove any partitions we no longer own.
+            if (pausedForRedelivery) {
+                pausedForRedelivery = false;
+                Set<TopicPartition> assigned = new HashSet<>(partitions);
+                Set<TopicPartition> taskPaused = context.pausedPartitions();
+
+                for (TopicPartition tp : partitions) {
+                    if (!taskPaused.contains(tp))
+                        consumer.resume(tp);
+                }
+
+                Iterator<TopicPartition> tpIter = taskPaused.iterator();
+                while (tpIter.hasNext()) {
+                    TopicPartition tp = tpIter.next();
+                    if (assigned.contains(tp))
+                        tpIter.remove();
+                }
+            }
+
             // Instead of invoking the assignment callback on initialization, we guarantee the consumer is ready upon
             // task start. Since this callback gets invoked during that initial setup before we've started the task, we
             // need to guard against invoking the user's callback method during that period.
@@ -305,6 +363,8 @@ class WorkerSinkTask implements WorkerTask {
         public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
             task.onPartitionsRevoked(partitions);
             commitOffsets(true, -1);
+            // Make sure we don't have any leftover data since offsets will be reset to committed positions
+            messageBatch.clear();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4b87dee/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
index b474589..5257ee4 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
@@ -17,6 +17,7 @@ import org.apache.kafka.copycat.errors.IllegalWorkerStateException;
 import org.apache.kafka.copycat.sink.SinkTaskContext;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -24,11 +25,13 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
     private Map<TopicPartition, Long> offsets;
     private long timeoutMs;
     private KafkaConsumer<byte[], byte[]> consumer;
+    private final Set<TopicPartition> pausedPartitions;
 
     public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer) {
         this.offsets = new HashMap<>();
         this.timeoutMs = -1L;
         this.consumer = consumer;
+        this.pausedPartitions = new HashSet<>();
     }
 
     @Override
@@ -80,6 +83,8 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
             throw new IllegalWorkerStateException("SinkTaskContext may not be used to pause consumption until the task is initialized");
         }
         try {
+            for (TopicPartition partition : partitions)
+                pausedPartitions.add(partition);
             consumer.pause(partitions);
         } catch (IllegalStateException e) {
             throw new IllegalWorkerStateException("SinkTasks may not pause partitions that are not currently assigned to them.", e);
@@ -92,9 +97,15 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
             throw new IllegalWorkerStateException("SinkTaskContext may not be used to resume consumption until the task is initialized");
         }
         try {
+            for (TopicPartition partition : partitions)
+                pausedPartitions.remove(partition);
             consumer.resume(partitions);
         } catch (IllegalStateException e) {
             throw new IllegalWorkerStateException("SinkTasks may not resume partitions that are not currently assigned to them.", e);
         }
     }
+
+    public Set<TopicPartition> pausedPartitions() {
+        return pausedPartitions;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4b87dee/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
index cdb41b0..6577fe9 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
@@ -306,6 +306,13 @@ class WorkerSourceTask implements WorkerTask {
                 }
             } catch (InterruptedException e) {
                 // Ignore and allow to exit.
+            } catch (Throwable t) {
+                log.error("Task {} threw an uncaught and unrecoverable exception", id);
+                log.error("Task is being killed and will not recover until manually restarted:", t);
+                // It should still be safe to let this fall through and commit offsets since this exception would have
+                // simply resulted in not getting more records but all the existing records should be ok to flush
+                // and commit offsets. Worst case, task.flush() will also throw an exception causing the offset commit
+                // to fail.
             }
 
             commitOffsets();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4b87dee/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index 7905736..0458054 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,13 +21,11 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetCommitCallback;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.copycat.data.Schema;
 import org.apache.kafka.copycat.data.SchemaAndValue;
-import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.errors.RetriableException;
 import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.copycat.sink.SinkConnector;
 import org.apache.kafka.copycat.sink.SinkRecord;
@@ -35,12 +33,11 @@ import org.apache.kafka.copycat.sink.SinkTask;
 import org.apache.kafka.copycat.storage.Converter;
 import org.apache.kafka.copycat.util.ConnectorTaskId;
 import org.apache.kafka.copycat.util.MockTime;
-import org.apache.kafka.copycat.util.ThreadedTest;
 import org.easymock.Capture;
 import org.easymock.CaptureType;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
-import org.easymock.IExpectationSetters;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.easymock.PowerMock;
@@ -48,30 +45,25 @@ import org.powermock.api.easymock.annotation.Mock;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(WorkerSinkTask.class)
 @PowerMockIgnore("javax.management.*")
-public class WorkerSinkTaskTest extends ThreadedTest {
-
+public class WorkerSinkTaskTest {
     // These are fixed to keep this code simpler. In this example we assume byte[] raw values
     // with mix of integer/string in Copycat
     private static final String TOPIC = "test";
     private static final int PARTITION = 12;
     private static final int PARTITION2 = 13;
-    private static final int PARTITION3 = 14;
     private static final long FIRST_OFFSET = 45;
     private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
     private static final int KEY = 12;
@@ -82,33 +74,34 @@ public class WorkerSinkTaskTest extends ThreadedTest {
 
     private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
     private static final TopicPartition TOPIC_PARTITION2 = new TopicPartition(TOPIC, PARTITION2);
-    private static final TopicPartition TOPIC_PARTITION3 = new TopicPartition(TOPIC, PARTITION3);
-    private static final TopicPartition UNASSIGNED_TOPIC_PARTITION = new TopicPartition(TOPIC, 200);
 
     private static final Map<String, String> TASK_PROPS = new HashMap<>();
     static {
         TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
     }
 
+
     private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
     private Time time;
-    @Mock private SinkTask sinkTask;
+    private WorkerSinkTask workerTask;
+    @Mock
+    private SinkTask sinkTask;
     private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
     private WorkerConfig workerConfig;
-    @Mock private Converter keyConverter;
+    @Mock
+    private Converter keyConverter;
     @Mock
     private Converter valueConverter;
-    private WorkerSinkTask workerTask;
-    @Mock private KafkaConsumer<byte[], byte[]> consumer;
+    @Mock
     private WorkerSinkTaskThread workerThread;
+    @Mock
+    private KafkaConsumer<byte[], byte[]> consumer;
     private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
 
     private long recordsReturned;
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public void setup() {
-        super.setup();
+    @Before
+    public void setUp() {
         time = new MockTime();
         Map<String, String> workerProps = new HashMap<>();
         workerProps.put("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
@@ -126,338 +119,46 @@ public class WorkerSinkTaskTest extends ThreadedTest {
     }
 
     @Test
-    public void testPollsInBackground() throws Exception {
-        expectInitializeTask();
-        Capture<Collection<SinkRecord>> capturedRecords = expectPolls(1L);
-        expectStopTask(10L);
-        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
-        PowerMock.replayAll();
-
-        workerTask.start(TASK_PROPS);
-        workerTask.joinConsumerGroupAndStart();
-        for (int i = 0; i < 10; i++) {
-            workerThread.iteration();
-        }
-        workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
-        workerTask.close();
-
-        // Verify contents match expected values, i.e. that they were translated properly. With max
-        // batch size 1 and poll returns 1 message at a time, we should have a matching # of batches
-        assertEquals(10, capturedRecords.getValues().size());
-        int offset = 0;
-        for (Collection<SinkRecord> recs : capturedRecords.getValues()) {
-            assertEquals(1, recs.size());
-            for (SinkRecord rec : recs) {
-                SinkRecord referenceSinkRecord
-                        = new SinkRecord(TOPIC, PARTITION, KEY_SCHEMA, KEY, VALUE_SCHEMA, VALUE, FIRST_OFFSET + offset);
-                assertEquals(referenceSinkRecord, rec);
-                offset++;
-            }
-        }
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testDeliverConvertsData() throws Exception {
-        // Validate conversion is performed when data is delivered
-        SchemaAndValue record = new SchemaAndValue(Schema.INT32_SCHEMA, 12);
-
-        ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
-                Collections.singletonMap(
-                        new TopicPartition(TOPIC, 0),
-                        Collections.singletonList(new ConsumerRecord<>(TOPIC, 0, 0, RAW_KEY, RAW_VALUE))));
-
-        // Exact data doesn't matter, but should be passed directly to sink task
-        EasyMock.expect(keyConverter.toCopycatData(EasyMock.eq(TOPIC), EasyMock.aryEq(RAW_KEY))).andReturn(record);
-        EasyMock.expect(valueConverter.toCopycatData(EasyMock.eq(TOPIC), EasyMock.aryEq(RAW_VALUE))).andReturn(record);
-        Capture<Collection<SinkRecord>> capturedRecords
-                = EasyMock.newCapture(CaptureType.ALL);
-        sinkTask.put(EasyMock.capture(capturedRecords));
-        EasyMock.expectLastCall();
-
-        PowerMock.replayAll();
-
-        Whitebox.invokeMethod(workerTask, "deliverMessages", records);
-        assertEquals(record.schema(), capturedRecords.getValue().iterator().next().keySchema());
-        assertEquals(record.value(), capturedRecords.getValue().iterator().next().key());
-        assertEquals(record.schema(), capturedRecords.getValue().iterator().next().valueSchema());
-        assertEquals(record.value(), capturedRecords.getValue().iterator().next().value());
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testCommit() throws Exception {
+    public void testPollRedelivery() throws Exception {
         expectInitializeTask();
-        // Make each poll() take the offset commit interval
-        Capture<Collection<SinkRecord>> capturedRecords
-                = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
-        expectOffsetFlush(1L, null, null, 0, true);
-        expectStopTask(2);
-        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
-        PowerMock.replayAll();
-
-        workerTask.start(TASK_PROPS);
-        workerTask.joinConsumerGroupAndStart();
-        // First iteration gets one record
-        workerThread.iteration();
-        // Second triggers commit, gets a second offset
-        workerThread.iteration();
-        // Commit finishes synchronously for testing so we can check this immediately
-        assertEquals(0, workerThread.commitFailures());
-        workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
-        workerTask.close();
 
-        assertEquals(2, capturedRecords.getValues().size());
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testCommitTaskFlushFailure() throws Exception {
-        expectInitializeTask();
-        Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
-        expectOffsetFlush(1L, new RuntimeException(), null, 0, true);
-        // Should rewind to last known good positions, which in this case will be the offsets loaded during initialization
-        // for all topic partitions
-        consumer.seek(TOPIC_PARTITION, FIRST_OFFSET);
-        PowerMock.expectLastCall();
-        consumer.seek(TOPIC_PARTITION2, FIRST_OFFSET);
-        PowerMock.expectLastCall();
-        consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET);
+        // If a retriable exception is thrown, we should redeliver the same batch, pausing the consumer in the meantime
+        expectConsumerPoll(1);
+        expectConvertMessages(1);
+        Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
+        sinkTask.put(EasyMock.capture(records));
+        EasyMock.expectLastCall().andThrow(new RetriableException("retry"));
+        // Pause
+        EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)));
+        consumer.pause(TOPIC_PARTITION);
         PowerMock.expectLastCall();
-        expectStopTask(2);
-        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
-        PowerMock.replayAll();
-
-        workerTask.start(TASK_PROPS);
-        workerTask.joinConsumerGroupAndStart();
-        // Second iteration triggers commit
-        workerThread.iteration();
-        workerThread.iteration();
-        assertEquals(1, workerThread.commitFailures());
-        assertEquals(false, Whitebox.getInternalState(workerThread, "committing"));
-        workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
-        workerTask.close();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testCommitTaskSuccessAndFlushFailure() throws Exception {
-        // Validate that we rewind to the correct
-
-        expectInitializeTask();
-        Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
-        expectOffsetFlush(1L, null, null, 0, true);
-        expectOffsetFlush(2L, new RuntimeException(), null, 0, true);
-        // Should rewind to last known good positions, which in this case will be the offsets last committed. This test
-        // isn't quite accurate since we started with assigning 3 topic partitions and then only committed one, but what
-        // is important here is that we roll back to the last committed values.
-        consumer.seek(TOPIC_PARTITION, FIRST_OFFSET);
+        consumer.pause(TOPIC_PARTITION2);
         PowerMock.expectLastCall();
-        expectStopTask(2);
-        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
-        PowerMock.replayAll();
-
-        workerTask.start(TASK_PROPS);
-        workerTask.joinConsumerGroupAndStart();
-        // Second iteration triggers first commit, third iteration triggers second (failing) commit
-        workerThread.iteration();
-        workerThread.iteration();
-        workerThread.iteration();
-        assertEquals(1, workerThread.commitFailures());
-        assertEquals(false, Whitebox.getInternalState(workerThread, "committing"));
-        workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
-        workerTask.close();
 
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testCommitConsumerFailure() throws Exception {
-        expectInitializeTask();
-        Capture<Collection<SinkRecord>> capturedRecords
-                = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
-        expectOffsetFlush(1L, null, new Exception(), 0, true);
-        expectStopTask(2);
-        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
-        PowerMock.replayAll();
-
-        workerTask.start(TASK_PROPS);
-        workerTask.joinConsumerGroupAndStart();
-        // Second iteration triggers commit
-        workerThread.iteration();
-        workerThread.iteration();
-        // TODO Response to consistent failures?
-        assertEquals(1, workerThread.commitFailures());
-        assertEquals(false, Whitebox.getInternalState(workerThread, "committing"));
-        workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
-        workerTask.close();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testCommitTimeout() throws Exception {
-        expectInitializeTask();
-        // Cut down amount of time to pass in each poll so we trigger exactly 1 offset commit
-        Capture<Collection<SinkRecord>> capturedRecords
-                = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT / 2);
-        expectOffsetFlush(2L, null, null, WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false);
-        expectStopTask(4);
-        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
-        PowerMock.replayAll();
-
-        workerTask.start(TASK_PROPS);
-        workerTask.joinConsumerGroupAndStart();
-        // Third iteration triggers commit, fourth gives a chance to trigger the timeout but doesn't
-        // trigger another commit
-        workerThread.iteration();
-        workerThread.iteration();
-        workerThread.iteration();
-        workerThread.iteration();
-        // TODO Response to consistent failures?
-        assertEquals(1, workerThread.commitFailures());
-        assertEquals(false, Whitebox.getInternalState(workerThread, "committing"));
-        workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
-        workerTask.close();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testAssignmentPauseResume() throws Exception {
-        // Just validate that the calls are passed through to the consumer, and that where appropriate errors are
-        // converted
-        expectInitializeTask();
-
-        expectOnePoll().andAnswer(new IAnswer<Object>() {
-            @Override
-            public Object answer() throws Throwable {
-                assertEquals(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)),
-                        sinkTaskContext.getValue().assignment());
-                return null;
-            }
-        });
-        EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)));
-
-        expectOnePoll().andAnswer(new IAnswer<Object>() {
-            @Override
-            public Object answer() throws Throwable {
-                try {
-                    sinkTaskContext.getValue().pause(UNASSIGNED_TOPIC_PARTITION);
-                    fail("Trying to pause unassigned partition should have thrown an Copycat exception");
-                } catch (CopycatException e) {
-                    // expected
-                }
-                sinkTaskContext.getValue().pause(TOPIC_PARTITION, TOPIC_PARTITION2);
-                return null;
-            }
-        });
-        consumer.pause(UNASSIGNED_TOPIC_PARTITION);
-        PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition"));
-        consumer.pause(TOPIC_PARTITION, TOPIC_PARTITION2);
+        // Retry delivery should suceed
+        expectConsumerPoll(0);
+        sinkTask.put(EasyMock.capture(records));
+        EasyMock.expectLastCall();
+        // And unpause
+        EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)));
+        consumer.resume(TOPIC_PARTITION);
         PowerMock.expectLastCall();
-
-        expectOnePoll().andAnswer(new IAnswer<Object>() {
-            @Override
-            public Object answer() throws Throwable {
-                try {
-                    sinkTaskContext.getValue().resume(UNASSIGNED_TOPIC_PARTITION);
-                    fail("Trying to resume unassigned partition should have thrown an Copycat exception");
-                } catch (CopycatException e) {
-                    // expected
-                }
-
-                sinkTaskContext.getValue().resume(TOPIC_PARTITION, TOPIC_PARTITION2);
-                return null;
-            }
-        });
-        consumer.resume(UNASSIGNED_TOPIC_PARTITION);
-        PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition"));
-        consumer.resume(TOPIC_PARTITION, TOPIC_PARTITION2);
+        consumer.resume(TOPIC_PARTITION2);
         PowerMock.expectLastCall();
 
-        expectStopTask(0);
-        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
         PowerMock.replayAll();
 
         workerTask.start(TASK_PROPS);
         workerTask.joinConsumerGroupAndStart();
-        workerThread.iteration();
-        workerThread.iteration();
-        workerThread.iteration();
-        workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
-        workerTask.close();
+        workerTask.poll(Long.MAX_VALUE);
+        workerTask.poll(Long.MAX_VALUE);
 
         PowerMock.verifyAll();
     }
 
-    @Test
-    public void testRewind() throws Exception {
-        expectInitializeTask();
-        final long startOffset = 40L;
-        final Map<TopicPartition, Long> offsets = new HashMap<>();
-
-        expectOnePoll().andAnswer(new IAnswer<Object>() {
-            @Override
-            public Object answer() throws Throwable {
-                offsets.put(TOPIC_PARTITION, startOffset);
-                sinkTaskContext.getValue().offset(offsets);
-                return null;
-            }
-        });
-
-        consumer.seek(TOPIC_PARTITION, startOffset);
-        EasyMock.expectLastCall();
-
-        expectOnePoll().andAnswer(new IAnswer<Object>() {
-            @Override
-            public Object answer() throws Throwable {
-                Map<TopicPartition, Long> offsets = sinkTaskContext.getValue().offsets();
-                assertEquals(0, offsets.size());
-                return null;
-            }
-        });
-
-        expectStopTask(3);
-        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
-        PowerMock.replayAll();
-
-        workerTask.start(TASK_PROPS);
-        workerTask.joinConsumerGroupAndStart();
-        workerThread.iteration();
-        workerThread.iteration();
-        workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
-        workerTask.close();
-
-        PowerMock.verifyAll();
-    }
 
     private void expectInitializeTask() throws Exception {
         PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
-
-        workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start", "awaitShutdown"},
-                workerTask, "mock-worker-thread", time,
-                workerConfig);
         PowerMock.expectPrivate(workerTask, "createWorkerThread")
                 .andReturn(workerThread);
         workerThread.start();
@@ -469,13 +170,12 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() {
             @Override
             public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
-                rebalanceListener.getValue().onPartitionsAssigned(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3));
+                rebalanceListener.getValue().onPartitionsAssigned(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
                 return ConsumerRecords.empty();
             }
         });
         EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
         EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
-        EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET);
 
         sinkTask.initialize(EasyMock.capture(sinkTaskContext));
         PowerMock.expectLastCall();
@@ -483,114 +183,26 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         PowerMock.expectLastCall();
     }
 
-    private void expectStopTask(final long expectedMessages) throws Exception {
-        final long finalOffset = FIRST_OFFSET + expectedMessages - 1;
-
-        sinkTask.stop();
-        PowerMock.expectLastCall();
-
-        // No offset commit since it happens in the mocked worker thread, but the main thread does need to wake up the
-        // consumer so it exits quickly
-        consumer.wakeup();
-        PowerMock.expectLastCall();
-
-        consumer.close();
-        PowerMock.expectLastCall();
-    }
-
-    // Note that this can only be called once per test currently
-    private Capture<Collection<SinkRecord>> expectPolls(final long pollDelayMs) throws Exception {
-        // Stub out all the consumer stream/iterator responses, which we just want to verify occur,
-        // but don't care about the exact details here.
-        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer(
-                new IAnswer<ConsumerRecords<byte[], byte[]>>() {
-                    @Override
-                    public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
-                        // "Sleep" so time will progress
-                        time.sleep(pollDelayMs);
-                        ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
-                                Collections.singletonMap(
-                                        new TopicPartition(TOPIC, PARTITION),
-                                        Arrays.asList(
-                                                new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE)
-                                        )));
-                        recordsReturned++;
-                        return records;
-                    }
-                });
-        EasyMock.expect(keyConverter.toCopycatData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes();
-        EasyMock.expect(valueConverter.toCopycatData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes();
-        Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL);
-        sinkTask.put(EasyMock.capture(capturedRecords));
-        EasyMock.expectLastCall().anyTimes();
-        return capturedRecords;
-    }
-
-    private IExpectationSetters<Object> expectOnePoll() {
-        // Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of
-        // returning empty data, we return one record. The expectation is that the data will be ignored by the
-        // response behavior specified using the return value of this method.
+    private void expectConsumerPoll(final int numMessages) {
         EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
                 new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                     @Override
                     public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
-                        // "Sleep" so time will progress
-                        time.sleep(1L);
-                        ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
-                                Collections.singletonMap(
-                                        new TopicPartition(TOPIC, PARTITION),
-                                        Arrays.asList(
-                                                new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE)
-                                        )));
-                        recordsReturned++;
-                        return records;
+                        List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
+                        for (int i = 0; i < numMessages; i++)
+                            records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned + i, RAW_KEY, RAW_VALUE));
+                        recordsReturned += numMessages;
+                        return new ConsumerRecords<>(
+                                numMessages > 0 ?
+                                        Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), records) :
+                                        Collections.<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>emptyMap()
+                        );
                     }
                 });
-        EasyMock.expect(keyConverter.toCopycatData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
-        EasyMock.expect(valueConverter.toCopycatData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
-        sinkTask.put(EasyMock.anyObject(Collection.class));
-        return EasyMock.expectLastCall();
     }
 
-    private Capture<OffsetCommitCallback> expectOffsetFlush(final long expectedMessages,
-                                                              final RuntimeException flushError,
-                                                              final Exception consumerCommitError,
-                                                              final long consumerCommitDelayMs,
-                                                              final boolean invokeCallback)
-            throws Exception {
-        final long finalOffset = FIRST_OFFSET + expectedMessages - 1;
-
-        EasyMock.expect(consumer.assignment()).andReturn(Collections.singleton(TOPIC_PARTITION));
-        EasyMock.expect(consumer.position(TOPIC_PARTITION)).andAnswer(
-                new IAnswer<Long>() {
-                    @Override
-                    public Long answer() throws Throwable {
-                        return FIRST_OFFSET + recordsReturned - 1;
-                    }
-                }
-        );
-
-        sinkTask.flush(Collections.singletonMap(TOPIC_PARTITION, new OffsetAndMetadata(finalOffset)));
-        IExpectationSetters<Object> flushExpectation = PowerMock.expectLastCall();
-        if (flushError != null) {
-            flushExpectation.andThrow(flushError).once();
-            return null;
-        }
-
-        final Capture<OffsetCommitCallback> capturedCallback = EasyMock.newCapture();
-        final Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(TOPIC_PARTITION, new OffsetAndMetadata(finalOffset));
-        consumer.commitAsync(EasyMock.eq(offsets),
-                EasyMock.capture(capturedCallback));
-        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
-            @Override
-            public Object answer() throws Throwable {
-                time.sleep(consumerCommitDelayMs);
-                if (invokeCallback)
-                    capturedCallback.getValue().onComplete(offsets, consumerCommitError);
-                return null;
-            }
-        });
-        return capturedCallback;
+    private void expectConvertMessages(final int numMessages) {
+        EasyMock.expect(keyConverter.toCopycatData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).times(numMessages);
+        EasyMock.expect(valueConverter.toCopycatData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).times(numMessages);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4b87dee/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThreadedTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThreadedTest.java
new file mode 100644
index 0000000..ded78a1
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThreadedTest.java
@@ -0,0 +1,563 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.copycat.data.Schema;
+import org.apache.kafka.copycat.data.SchemaAndValue;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.copycat.sink.SinkConnector;
+import org.apache.kafka.copycat.sink.SinkRecord;
+import org.apache.kafka.copycat.sink.SinkTask;
+import org.apache.kafka.copycat.storage.Converter;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.copycat.util.MockTime;
+import org.apache.kafka.copycat.util.ThreadedTest;
+import org.easymock.Capture;
+import org.easymock.CaptureType;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(WorkerSinkTask.class)
+@PowerMockIgnore("javax.management.*")
+public class WorkerSinkTaskThreadedTest extends ThreadedTest {
+
+    // These are fixed to keep this code simpler. In this example we assume byte[] raw values
+    // with mix of integer/string in Copycat
+    private static final String TOPIC = "test";
+    private static final int PARTITION = 12;
+    private static final int PARTITION2 = 13;
+    private static final int PARTITION3 = 14;
+    private static final long FIRST_OFFSET = 45;
+    private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
+    private static final int KEY = 12;
+    private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
+    private static final String VALUE = "VALUE";
+    private static final byte[] RAW_KEY = "key".getBytes();
+    private static final byte[] RAW_VALUE = "value".getBytes();
+
+    private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
+    private static final TopicPartition TOPIC_PARTITION2 = new TopicPartition(TOPIC, PARTITION2);
+    private static final TopicPartition TOPIC_PARTITION3 = new TopicPartition(TOPIC, PARTITION3);
+    private static final TopicPartition UNASSIGNED_TOPIC_PARTITION = new TopicPartition(TOPIC, 200);
+
+    private static final Map<String, String> TASK_PROPS = new HashMap<>();
+    static {
+        TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
+    }
+
+    private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private Time time;
+    @Mock private SinkTask sinkTask;
+    private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
+    private WorkerConfig workerConfig;
+    @Mock private Converter keyConverter;
+    @Mock
+    private Converter valueConverter;
+    private WorkerSinkTask workerTask;
+    @Mock private KafkaConsumer<byte[], byte[]> consumer;
+    private WorkerSinkTaskThread workerThread;
+    private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
+
+    private long recordsReturned;
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void setup() {
+        super.setup();
+        time = new MockTime();
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.put("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.put("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.put("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.put("internal.key.converter.schemas.enable", "false");
+        workerProps.put("internal.value.converter.schemas.enable", "false");
+        workerConfig = new StandaloneConfig(workerProps);
+        workerTask = PowerMock.createPartialMock(
+                WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},
+                taskId, sinkTask, workerConfig, keyConverter, valueConverter, time);
+
+        recordsReturned = 0;
+    }
+
+    @Test
+    public void testPollsInBackground() throws Exception {
+        expectInitializeTask();
+        Capture<Collection<SinkRecord>> capturedRecords = expectPolls(1L);
+        expectStopTask(10L);
+        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
+
+        PowerMock.replayAll();
+
+        workerTask.start(TASK_PROPS);
+        workerTask.joinConsumerGroupAndStart();
+        for (int i = 0; i < 10; i++) {
+            workerThread.iteration();
+        }
+        workerTask.stop();
+        workerTask.awaitStop(Long.MAX_VALUE);
+        workerTask.close();
+
+        // Verify contents match expected values, i.e. that they were translated properly. With max
+        // batch size 1 and poll returns 1 message at a time, we should have a matching # of batches
+        assertEquals(10, capturedRecords.getValues().size());
+        int offset = 0;
+        for (Collection<SinkRecord> recs : capturedRecords.getValues()) {
+            assertEquals(1, recs.size());
+            for (SinkRecord rec : recs) {
+                SinkRecord referenceSinkRecord
+                        = new SinkRecord(TOPIC, PARTITION, KEY_SCHEMA, KEY, VALUE_SCHEMA, VALUE, FIRST_OFFSET + offset);
+                assertEquals(referenceSinkRecord, rec);
+                offset++;
+            }
+        }
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCommit() throws Exception {
+        expectInitializeTask();
+        // Make each poll() take the offset commit interval
+        Capture<Collection<SinkRecord>> capturedRecords
+                = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
+        expectOffsetFlush(1L, null, null, 0, true);
+        expectStopTask(2);
+        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
+
+        PowerMock.replayAll();
+
+        workerTask.start(TASK_PROPS);
+        workerTask.joinConsumerGroupAndStart();
+        // First iteration gets one record
+        workerThread.iteration();
+        // Second triggers commit, gets a second offset
+        workerThread.iteration();
+        // Commit finishes synchronously for testing so we can check this immediately
+        assertEquals(0, workerThread.commitFailures());
+        workerTask.stop();
+        workerTask.awaitStop(Long.MAX_VALUE);
+        workerTask.close();
+
+        assertEquals(2, capturedRecords.getValues().size());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCommitTaskFlushFailure() throws Exception {
+        expectInitializeTask();
+        Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
+        expectOffsetFlush(1L, new RuntimeException(), null, 0, true);
+        // Should rewind to last known good positions, which in this case will be the offsets loaded during initialization
+        // for all topic partitions
+        consumer.seek(TOPIC_PARTITION, FIRST_OFFSET);
+        PowerMock.expectLastCall();
+        consumer.seek(TOPIC_PARTITION2, FIRST_OFFSET);
+        PowerMock.expectLastCall();
+        consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET);
+        PowerMock.expectLastCall();
+        expectStopTask(2);
+        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
+
+        PowerMock.replayAll();
+
+        workerTask.start(TASK_PROPS);
+        workerTask.joinConsumerGroupAndStart();
+        // Second iteration triggers commit
+        workerThread.iteration();
+        workerThread.iteration();
+        assertEquals(1, workerThread.commitFailures());
+        assertEquals(false, Whitebox.getInternalState(workerThread, "committing"));
+        workerTask.stop();
+        workerTask.awaitStop(Long.MAX_VALUE);
+        workerTask.close();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCommitTaskSuccessAndFlushFailure() throws Exception {
+        // Validate that we rewind to the correct offsets if a task's flush method throws an exception
+
+        expectInitializeTask();
+        Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
+        expectOffsetFlush(1L, null, null, 0, true);
+        expectOffsetFlush(2L, new RuntimeException(), null, 0, true);
+        // Should rewind to last known committed positions
+        consumer.seek(TOPIC_PARTITION, FIRST_OFFSET + 1);
+        PowerMock.expectLastCall();
+        consumer.seek(TOPIC_PARTITION2, FIRST_OFFSET);
+        PowerMock.expectLastCall();
+        consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET);
+        PowerMock.expectLastCall();
+        expectStopTask(2);
+        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
+
+        PowerMock.replayAll();
+
+        workerTask.start(TASK_PROPS);
+        workerTask.joinConsumerGroupAndStart();
+        // Second iteration triggers first commit, third iteration triggers second (failing) commit
+        workerThread.iteration();
+        workerThread.iteration();
+        workerThread.iteration();
+        assertEquals(1, workerThread.commitFailures());
+        assertEquals(false, Whitebox.getInternalState(workerThread, "committing"));
+        workerTask.stop();
+        workerTask.awaitStop(Long.MAX_VALUE);
+        workerTask.close();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCommitConsumerFailure() throws Exception {
+        expectInitializeTask();
+        Capture<Collection<SinkRecord>> capturedRecords
+                = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
+        expectOffsetFlush(1L, null, new Exception(), 0, true);
+        expectStopTask(2);
+        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
+
+        PowerMock.replayAll();
+
+        workerTask.start(TASK_PROPS);
+        workerTask.joinConsumerGroupAndStart();
+        // Second iteration triggers commit
+        workerThread.iteration();
+        workerThread.iteration();
+        // TODO Response to consistent failures?
+        assertEquals(1, workerThread.commitFailures());
+        assertEquals(false, Whitebox.getInternalState(workerThread, "committing"));
+        workerTask.stop();
+        workerTask.awaitStop(Long.MAX_VALUE);
+        workerTask.close();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCommitTimeout() throws Exception {
+        expectInitializeTask();
+        // Cut down amount of time to pass in each poll so we trigger exactly 1 offset commit
+        Capture<Collection<SinkRecord>> capturedRecords
+                = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT / 2);
+        expectOffsetFlush(2L, null, null, WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false);
+        expectStopTask(4);
+        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
+
+        PowerMock.replayAll();
+
+        workerTask.start(TASK_PROPS);
+        workerTask.joinConsumerGroupAndStart();
+        // Third iteration triggers commit, fourth gives a chance to trigger the timeout but doesn't
+        // trigger another commit
+        workerThread.iteration();
+        workerThread.iteration();
+        workerThread.iteration();
+        workerThread.iteration();
+        // TODO Response to consistent failures?
+        assertEquals(1, workerThread.commitFailures());
+        assertEquals(false, Whitebox.getInternalState(workerThread, "committing"));
+        workerTask.stop();
+        workerTask.awaitStop(Long.MAX_VALUE);
+        workerTask.close();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testAssignmentPauseResume() throws Exception {
+        // Just validate that the calls are passed through to the consumer, and that where appropriate errors are
+        // converted
+        expectInitializeTask();
+
+        expectOnePoll().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                assertEquals(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)),
+                        sinkTaskContext.getValue().assignment());
+                return null;
+            }
+        });
+        EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)));
+
+        expectOnePoll().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                try {
+                    sinkTaskContext.getValue().pause(UNASSIGNED_TOPIC_PARTITION);
+                    fail("Trying to pause unassigned partition should have thrown an Copycat exception");
+                } catch (CopycatException e) {
+                    // expected
+                }
+                sinkTaskContext.getValue().pause(TOPIC_PARTITION, TOPIC_PARTITION2);
+                return null;
+            }
+        });
+        consumer.pause(UNASSIGNED_TOPIC_PARTITION);
+        PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition"));
+        consumer.pause(TOPIC_PARTITION, TOPIC_PARTITION2);
+        PowerMock.expectLastCall();
+
+        expectOnePoll().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                try {
+                    sinkTaskContext.getValue().resume(UNASSIGNED_TOPIC_PARTITION);
+                    fail("Trying to resume unassigned partition should have thrown an Copycat exception");
+                } catch (CopycatException e) {
+                    // expected
+                }
+
+                sinkTaskContext.getValue().resume(TOPIC_PARTITION, TOPIC_PARTITION2);
+                return null;
+            }
+        });
+        consumer.resume(UNASSIGNED_TOPIC_PARTITION);
+        PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition"));
+        consumer.resume(TOPIC_PARTITION, TOPIC_PARTITION2);
+        PowerMock.expectLastCall();
+
+        expectStopTask(0);
+        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
+
+        PowerMock.replayAll();
+
+        workerTask.start(TASK_PROPS);
+        workerTask.joinConsumerGroupAndStart();
+        workerThread.iteration();
+        workerThread.iteration();
+        workerThread.iteration();
+        workerTask.stop();
+        workerTask.awaitStop(Long.MAX_VALUE);
+        workerTask.close();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRewind() throws Exception {
+        expectInitializeTask();
+        final long startOffset = 40L;
+        final Map<TopicPartition, Long> offsets = new HashMap<>();
+
+        expectOnePoll().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                offsets.put(TOPIC_PARTITION, startOffset);
+                sinkTaskContext.getValue().offset(offsets);
+                return null;
+            }
+        });
+
+        consumer.seek(TOPIC_PARTITION, startOffset);
+        EasyMock.expectLastCall();
+
+        expectOnePoll().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                Map<TopicPartition, Long> offsets = sinkTaskContext.getValue().offsets();
+                assertEquals(0, offsets.size());
+                return null;
+            }
+        });
+
+        expectStopTask(3);
+        EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
+
+        PowerMock.replayAll();
+
+        workerTask.start(TASK_PROPS);
+        workerTask.joinConsumerGroupAndStart();
+        workerThread.iteration();
+        workerThread.iteration();
+        workerTask.stop();
+        workerTask.awaitStop(Long.MAX_VALUE);
+        workerTask.close();
+
+        PowerMock.verifyAll();
+    }
+
+    private void expectInitializeTask() throws Exception {
+        PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
+
+        workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start", "awaitShutdown"},
+                workerTask, "mock-worker-thread", time,
+                workerConfig);
+        PowerMock.expectPrivate(workerTask, "createWorkerThread")
+                .andReturn(workerThread);
+        workerThread.start();
+        PowerMock.expectLastCall();
+
+        consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener));
+        PowerMock.expectLastCall();
+
+        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() {
+            @Override
+            public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
+                rebalanceListener.getValue().onPartitionsAssigned(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3));
+                return ConsumerRecords.empty();
+            }
+        });
+        EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
+        EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
+        EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET);
+
+        sinkTask.initialize(EasyMock.capture(sinkTaskContext));
+        PowerMock.expectLastCall();
+        sinkTask.start(TASK_PROPS);
+        PowerMock.expectLastCall();
+    }
+
+    private void expectStopTask(final long expectedMessages) throws Exception {
+        final long finalOffset = FIRST_OFFSET + expectedMessages - 1;
+
+        sinkTask.stop();
+        PowerMock.expectLastCall();
+
+        // No offset commit since it happens in the mocked worker thread, but the main thread does need to wake up the
+        // consumer so it exits quickly
+        consumer.wakeup();
+        PowerMock.expectLastCall();
+
+        consumer.close();
+        PowerMock.expectLastCall();
+    }
+
+    // Note that this can only be called once per test currently
+    private Capture<Collection<SinkRecord>> expectPolls(final long pollDelayMs) throws Exception {
+        // Stub out all the consumer stream/iterator responses, which we just want to verify occur,
+        // but don't care about the exact details here.
+        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer(
+                new IAnswer<ConsumerRecords<byte[], byte[]>>() {
+                    @Override
+                    public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
+                        // "Sleep" so time will progress
+                        time.sleep(pollDelayMs);
+                        ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
+                                Collections.singletonMap(
+                                        new TopicPartition(TOPIC, PARTITION),
+                                        Arrays.asList(
+                                                new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE)
+                                        )));
+                        recordsReturned++;
+                        return records;
+                    }
+                });
+        EasyMock.expect(keyConverter.toCopycatData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes();
+        EasyMock.expect(valueConverter.toCopycatData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes();
+        Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL);
+        sinkTask.put(EasyMock.capture(capturedRecords));
+        EasyMock.expectLastCall().anyTimes();
+        return capturedRecords;
+    }
+
+    private IExpectationSetters<Object> expectOnePoll() {
+        // Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of
+        // returning empty data, we return one record. The expectation is that the data will be ignored by the
+        // response behavior specified using the return value of this method.
+        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+                new IAnswer<ConsumerRecords<byte[], byte[]>>() {
+                    @Override
+                    public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
+                        // "Sleep" so time will progress
+                        time.sleep(1L);
+                        ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
+                                Collections.singletonMap(
+                                        new TopicPartition(TOPIC, PARTITION),
+                                        Arrays.asList(
+                                                new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE)
+                                        )));
+                        recordsReturned++;
+                        return records;
+                    }
+                });
+        EasyMock.expect(keyConverter.toCopycatData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
+        EasyMock.expect(valueConverter.toCopycatData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
+        sinkTask.put(EasyMock.anyObject(Collection.class));
+        return EasyMock.expectLastCall();
+    }
+
+    private Capture<OffsetCommitCallback> expectOffsetFlush(final long expectedMessages,
+                                                              final RuntimeException flushError,
+                                                              final Exception consumerCommitError,
+                                                              final long consumerCommitDelayMs,
+                                                              final boolean invokeCallback)
+            throws Exception {
+        final long finalOffset = FIRST_OFFSET + expectedMessages;
+
+        // All assigned partitions will have offsets committed, but we've only processed messages/updated offsets for one
+        final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
+        offsetsToCommit.put(TOPIC_PARTITION, new OffsetAndMetadata(finalOffset));
+        offsetsToCommit.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+        offsetsToCommit.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET));
+        sinkTask.flush(offsetsToCommit);
+        IExpectationSetters<Object> flushExpectation = PowerMock.expectLastCall();
+        if (flushError != null) {
+            flushExpectation.andThrow(flushError).once();
+            return null;
+        }
+
+        final Capture<OffsetCommitCallback> capturedCallback = EasyMock.newCapture();
+        consumer.commitAsync(EasyMock.eq(offsetsToCommit),
+                EasyMock.capture(capturedCallback));
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                time.sleep(consumerCommitDelayMs);
+                if (invokeCallback)
+                    capturedCallback.getValue().onComplete(offsetsToCommit, consumerCommitError);
+                return null;
+            }
+        });
+        return capturedCallback;
+    }
+
+}