You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/09 05:41:47 UTC
kafka git commit: KAFKA-2480: Handle retriable and non-retriable
exceptions thrown by sink tasks.
Repository: kafka
Updated Branches:
refs/heads/trunk 83fb73460 -> f4b87deef
KAFKA-2480: Handle retriable and non-retriable exceptions thrown by sink tasks.
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: Gwen Shapira
Closes #450 from ewencp/kafka-2480-unrecoverable-task-errors
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f4b87dee
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f4b87dee
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f4b87dee
Branch: refs/heads/trunk
Commit: f4b87deefecf4902992a84d4a3fe3b99a94ff72b
Parents: 83fb734
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Sun Nov 8 20:41:35 2015 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Sun Nov 8 20:41:35 2015 -0800
----------------------------------------------------------------------
.../org/apache/kafka/copycat/sink/SinkTask.java | 5 +
.../copycat/errors/RetriableException.java | 35 ++
.../kafka/copycat/runtime/WorkerSinkTask.java | 116 +++-
.../copycat/runtime/WorkerSinkTaskContext.java | 11 +
.../kafka/copycat/runtime/WorkerSourceTask.java | 7 +
.../copycat/runtime/WorkerSinkTaskTest.java | 496 ++--------------
.../runtime/WorkerSinkTaskThreadedTest.java | 563 +++++++++++++++++++
7 files changed, 763 insertions(+), 470 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4b87dee/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
index b2d5ff6..90651ed 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkTask.java
@@ -57,6 +57,11 @@ public abstract class SinkTask implements Task {
* Put the records in the sink. Usually this should send the records to the sink asynchronously
* and immediately return.
*
+ * If this operation fails, the SinkTask may throw a {@link org.apache.kafka.copycat.errors.RetriableException} to
+ * indicate that the framework should attempt to retry the same call again. Other exceptions will cause the task to
+ * be stopped immediately. {@link SinkTaskContext#timeout(long)} can be used to set the maximum time before the
+ * batch will be retried.
+ *
* @param records the set of records to send
*/
public abstract void put(Collection<SinkRecord> records);
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4b87dee/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/RetriableException.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/RetriableException.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/RetriableException.java
new file mode 100644
index 0000000..75821aa
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/RetriableException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.errors;
+
+/**
+ * An exception that indicates the operation can be reattempted.
+ */
+public class RetriableException extends CopycatException {
+ public RetriableException(String s) {
+ super(s);
+ }
+
+ public RetriableException(String s, Throwable throwable) {
+ super(s, throwable);
+ }
+
+ public RetriableException(Throwable throwable) {
+ super(throwable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4b87dee/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
index e9193b8..ad6d872 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.copycat.data.SchemaAndValue;
import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.errors.RetriableException;
import org.apache.kafka.copycat.sink.SinkRecord;
import org.apache.kafka.copycat.sink.SinkTask;
import org.apache.kafka.copycat.storage.Converter;
@@ -42,8 +43,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
@@ -63,7 +67,10 @@ class WorkerSinkTask implements WorkerTask {
private KafkaConsumer<byte[], byte[]> consumer;
private WorkerSinkTaskContext context;
private boolean started;
+ private final List<SinkRecord> messageBatch;
private Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets;
+ private Map<TopicPartition, OffsetAndMetadata> currentOffsets;
+ private boolean pausedForRedelivery;
public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig,
Converter keyConverter, Converter valueConverter, Time time) {
@@ -74,6 +81,9 @@ class WorkerSinkTask implements WorkerTask {
this.valueConverter = valueConverter;
this.time = time;
this.started = false;
+ this.messageBatch = new ArrayList<>();
+ this.currentOffsets = new HashMap<>();
+ this.pausedForRedelivery = false;
}
@Override
@@ -156,10 +166,14 @@ class WorkerSinkTask implements WorkerTask {
timeoutMs = Math.min(timeoutMs, retryTimeout);
context.timeout(-1L);
}
+
log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs);
+ assert messageBatch.isEmpty() || msgs.isEmpty();
log.trace("{} polling returned {} messages", id, msgs.count());
- deliverMessages(msgs);
+
+ convertMessages(msgs);
+ deliverMessages();
} catch (WakeupException we) {
log.trace("{} consumer woken up", id);
}
@@ -171,12 +185,8 @@ class WorkerSinkTask implements WorkerTask {
**/
public void commitOffsets(boolean sync, final int seqno) {
log.info("{} Committing offsets", this);
- final HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
- for (TopicPartition tp : consumer.assignment()) {
- long pos = consumer.position(tp);
- offsets.put(tp, new OffsetAndMetadata(pos));
- log.debug("{} committing {} offset {}", id, tp, pos);
- }
+
+ final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(currentOffsets);
try {
task.flush(offsets);
@@ -187,6 +197,7 @@ class WorkerSinkTask implements WorkerTask {
log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset());
consumer.seek(entry.getKey(), entry.getValue().offset());
}
+ currentOffsets = new HashMap<>(lastCommittedOffsets);
workThread.onCommitCompleted(t, seqno);
return;
}
@@ -195,6 +206,7 @@ class WorkerSinkTask implements WorkerTask {
try {
consumer.commitSync(offsets);
lastCommittedOffsets = offsets;
+ workThread.onCommitCompleted(null, seqno);
} catch (KafkaException e) {
workThread.onCommitCompleted(e, seqno);
}
@@ -244,29 +256,49 @@ class WorkerSinkTask implements WorkerTask {
return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig);
}
- private void deliverMessages(ConsumerRecords<byte[], byte[]> msgs) {
- // Finally, deliver this batch to the sink
- if (msgs.count() > 0) {
- List<SinkRecord> records = new ArrayList<>();
- for (ConsumerRecord<byte[], byte[]> msg : msgs) {
- log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
- SchemaAndValue keyAndSchema = keyConverter.toCopycatData(msg.topic(), msg.key());
- SchemaAndValue valueAndSchema = valueConverter.toCopycatData(msg.topic(), msg.value());
- records.add(
- new SinkRecord(msg.topic(), msg.partition(),
- keyAndSchema.schema(), keyAndSchema.value(),
- valueAndSchema.schema(), valueAndSchema.value(),
- msg.offset())
- );
- }
+ private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) {
+ for (ConsumerRecord<byte[], byte[]> msg : msgs) {
+ log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
+ SchemaAndValue keyAndSchema = keyConverter.toCopycatData(msg.topic(), msg.key());
+ SchemaAndValue valueAndSchema = valueConverter.toCopycatData(msg.topic(), msg.value());
+ messageBatch.add(
+ new SinkRecord(msg.topic(), msg.partition(),
+ keyAndSchema.schema(), keyAndSchema.value(),
+ valueAndSchema.schema(), valueAndSchema.value(),
+ msg.offset())
+ );
+ }
+ }
- try {
- task.put(records);
- } catch (CopycatException e) {
- log.error("Exception from SinkTask {}: ", id, e);
- } catch (Throwable t) {
- log.error("Unexpected exception from SinkTask {}: ", id, t);
+ private void deliverMessages() {
+ // Finally, deliver this batch to the sink
+ try {
+ // Since we reuse the messageBatch buffer, ensure we give the task its own copy
+ task.put(new ArrayList<>(messageBatch));
+ for (SinkRecord record : messageBatch)
+ currentOffsets.put(new TopicPartition(record.topic(), record.kafkaPartition()),
+ new OffsetAndMetadata(record.kafkaOffset() + 1));
+ messageBatch.clear();
+ // If we had paused all consumer topic partitions to try to redeliver data, then we should resume any that
+ // the task had not explicitly paused
+ if (pausedForRedelivery) {
+ for (TopicPartition tp : consumer.assignment())
+ if (!context.pausedPartitions().contains(tp))
+ consumer.resume(tp);
+ pausedForRedelivery = false;
}
+ } catch (RetriableException e) {
+ log.error("RetriableException from SinkTask {}: {}", id, e);
+ // If we're retrying a previous batch, make sure we've paused all topic partitions so we don't get new data,
+ // but will still be able to poll in order to handle user-requested timeouts, keep group membership, etc.
+ pausedForRedelivery = true;
+ for (TopicPartition tp : consumer.assignment())
+ consumer.pause(tp);
+ // Let this exit normally, the batch will be reprocessed on the next loop.
+ } catch (Throwable t) {
+ log.error("Task {} threw an uncaught and unrecoverable exception", id);
+ log.error("Task is being killed and will not recover until manually restarted:", t);
+ throw new CopycatException("Exiting WorkerSinkTask due to unrecoverable exception.");
}
}
@@ -280,6 +312,8 @@ class WorkerSinkTask implements WorkerTask {
if (offset != null) {
log.trace("Rewind {} to offset {}.", tp, offset);
consumer.seek(tp, offset);
+ lastCommittedOffsets.put(tp, new OffsetAndMetadata(offset));
+ currentOffsets.put(tp, new OffsetAndMetadata(offset));
}
}
context.clearOffsets();
@@ -289,11 +323,35 @@ class WorkerSinkTask implements WorkerTask {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
lastCommittedOffsets = new HashMap<>();
+ currentOffsets = new HashMap<>();
for (TopicPartition tp : partitions) {
long pos = consumer.position(tp);
lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
+ currentOffsets.put(tp, new OffsetAndMetadata(pos));
log.debug("{} assigned topic partition {} with offset {}", id, tp, pos);
}
+
+ // If we paused everything for redelivery (which is no longer relevant since we discarded the data), make
+ // sure anything we paused that the task didn't request to be paused *and* which we still own is resumed.
+ // Also make sure our tracking of paused partitions is updated to remove any partitions we no longer own.
+ if (pausedForRedelivery) {
+ pausedForRedelivery = false;
+ Set<TopicPartition> assigned = new HashSet<>(partitions);
+ Set<TopicPartition> taskPaused = context.pausedPartitions();
+
+ for (TopicPartition tp : partitions) {
+ if (!taskPaused.contains(tp))
+ consumer.resume(tp);
+ }
+
+ Iterator<TopicPartition> tpIter = taskPaused.iterator();
+ while (tpIter.hasNext()) {
+ TopicPartition tp = tpIter.next();
+ if (assigned.contains(tp))
+ tpIter.remove();
+ }
+ }
+
// Instead of invoking the assignment callback on initialization, we guarantee the consumer is ready upon
// task start. Since this callback gets invoked during that initial setup before we've started the task, we
// need to guard against invoking the user's callback method during that period.
@@ -305,6 +363,8 @@ class WorkerSinkTask implements WorkerTask {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
task.onPartitionsRevoked(partitions);
commitOffsets(true, -1);
+ // Make sure we don't have any leftover data since offsets will be reset to committed positions
+ messageBatch.clear();
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4b87dee/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
index b474589..5257ee4 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskContext.java
@@ -17,6 +17,7 @@ import org.apache.kafka.copycat.errors.IllegalWorkerStateException;
import org.apache.kafka.copycat.sink.SinkTaskContext;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -24,11 +25,13 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
private Map<TopicPartition, Long> offsets;
private long timeoutMs;
private KafkaConsumer<byte[], byte[]> consumer;
+ private final Set<TopicPartition> pausedPartitions;
public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer) {
this.offsets = new HashMap<>();
this.timeoutMs = -1L;
this.consumer = consumer;
+ this.pausedPartitions = new HashSet<>();
}
@Override
@@ -80,6 +83,8 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
throw new IllegalWorkerStateException("SinkTaskContext may not be used to pause consumption until the task is initialized");
}
try {
+ for (TopicPartition partition : partitions)
+ pausedPartitions.add(partition);
consumer.pause(partitions);
} catch (IllegalStateException e) {
throw new IllegalWorkerStateException("SinkTasks may not pause partitions that are not currently assigned to them.", e);
@@ -92,9 +97,15 @@ public class WorkerSinkTaskContext implements SinkTaskContext {
throw new IllegalWorkerStateException("SinkTaskContext may not be used to resume consumption until the task is initialized");
}
try {
+ for (TopicPartition partition : partitions)
+ pausedPartitions.remove(partition);
consumer.resume(partitions);
} catch (IllegalStateException e) {
throw new IllegalWorkerStateException("SinkTasks may not resume partitions that are not currently assigned to them.", e);
}
}
+
+ public Set<TopicPartition> pausedPartitions() {
+ return pausedPartitions;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4b87dee/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
index cdb41b0..6577fe9 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
@@ -306,6 +306,13 @@ class WorkerSourceTask implements WorkerTask {
}
} catch (InterruptedException e) {
// Ignore and allow to exit.
+ } catch (Throwable t) {
+ log.error("Task {} threw an uncaught and unrecoverable exception", id);
+ log.error("Task is being killed and will not recover until manually restarted:", t);
+ // It should still be safe to let this fall through and commit offsets since this exception would have
+ // simply resulted in not getting more records but all the existing records should be ok to flush
+ // and commit offsets. Worst case, task.flush() will also throw an exception causing the offset commit
+ // to fail.
}
commitOffsets();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4b87dee/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index 7905736..0458054 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -5,9 +5,9 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -21,13 +21,11 @@ import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.copycat.data.Schema;
import org.apache.kafka.copycat.data.SchemaAndValue;
-import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.errors.RetriableException;
import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig;
import org.apache.kafka.copycat.sink.SinkConnector;
import org.apache.kafka.copycat.sink.SinkRecord;
@@ -35,12 +33,11 @@ import org.apache.kafka.copycat.sink.SinkTask;
import org.apache.kafka.copycat.storage.Converter;
import org.apache.kafka.copycat.util.ConnectorTaskId;
import org.apache.kafka.copycat.util.MockTime;
-import org.apache.kafka.copycat.util.ThreadedTest;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
-import org.easymock.IExpectationSetters;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
@@ -48,30 +45,25 @@ import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class)
@PrepareForTest(WorkerSinkTask.class)
@PowerMockIgnore("javax.management.*")
-public class WorkerSinkTaskTest extends ThreadedTest {
-
+public class WorkerSinkTaskTest {
// These are fixed to keep this code simpler. In this example we assume byte[] raw values
// with mix of integer/string in Copycat
private static final String TOPIC = "test";
private static final int PARTITION = 12;
private static final int PARTITION2 = 13;
- private static final int PARTITION3 = 14;
private static final long FIRST_OFFSET = 45;
private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
private static final int KEY = 12;
@@ -82,33 +74,34 @@ public class WorkerSinkTaskTest extends ThreadedTest {
private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
private static final TopicPartition TOPIC_PARTITION2 = new TopicPartition(TOPIC, PARTITION2);
- private static final TopicPartition TOPIC_PARTITION3 = new TopicPartition(TOPIC, PARTITION3);
- private static final TopicPartition UNASSIGNED_TOPIC_PARTITION = new TopicPartition(TOPIC, 200);
private static final Map<String, String> TASK_PROPS = new HashMap<>();
static {
TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
}
+
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
private Time time;
- @Mock private SinkTask sinkTask;
+ private WorkerSinkTask workerTask;
+ @Mock
+ private SinkTask sinkTask;
private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
private WorkerConfig workerConfig;
- @Mock private Converter keyConverter;
+ @Mock
+ private Converter keyConverter;
@Mock
private Converter valueConverter;
- private WorkerSinkTask workerTask;
- @Mock private KafkaConsumer<byte[], byte[]> consumer;
+ @Mock
private WorkerSinkTaskThread workerThread;
+ @Mock
+ private KafkaConsumer<byte[], byte[]> consumer;
private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
private long recordsReturned;
- @SuppressWarnings("unchecked")
- @Override
- public void setup() {
- super.setup();
+ @Before
+ public void setUp() {
time = new MockTime();
Map<String, String> workerProps = new HashMap<>();
workerProps.put("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
@@ -126,338 +119,46 @@ public class WorkerSinkTaskTest extends ThreadedTest {
}
@Test
- public void testPollsInBackground() throws Exception {
- expectInitializeTask();
- Capture<Collection<SinkRecord>> capturedRecords = expectPolls(1L);
- expectStopTask(10L);
- EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
- PowerMock.replayAll();
-
- workerTask.start(TASK_PROPS);
- workerTask.joinConsumerGroupAndStart();
- for (int i = 0; i < 10; i++) {
- workerThread.iteration();
- }
- workerTask.stop();
- workerTask.awaitStop(Long.MAX_VALUE);
- workerTask.close();
-
- // Verify contents match expected values, i.e. that they were translated properly. With max
- // batch size 1 and poll returns 1 message at a time, we should have a matching # of batches
- assertEquals(10, capturedRecords.getValues().size());
- int offset = 0;
- for (Collection<SinkRecord> recs : capturedRecords.getValues()) {
- assertEquals(1, recs.size());
- for (SinkRecord rec : recs) {
- SinkRecord referenceSinkRecord
- = new SinkRecord(TOPIC, PARTITION, KEY_SCHEMA, KEY, VALUE_SCHEMA, VALUE, FIRST_OFFSET + offset);
- assertEquals(referenceSinkRecord, rec);
- offset++;
- }
- }
-
- PowerMock.verifyAll();
- }
-
- @Test
- public void testDeliverConvertsData() throws Exception {
- // Validate conversion is performed when data is delivered
- SchemaAndValue record = new SchemaAndValue(Schema.INT32_SCHEMA, 12);
-
- ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
- Collections.singletonMap(
- new TopicPartition(TOPIC, 0),
- Collections.singletonList(new ConsumerRecord<>(TOPIC, 0, 0, RAW_KEY, RAW_VALUE))));
-
- // Exact data doesn't matter, but should be passed directly to sink task
- EasyMock.expect(keyConverter.toCopycatData(EasyMock.eq(TOPIC), EasyMock.aryEq(RAW_KEY))).andReturn(record);
- EasyMock.expect(valueConverter.toCopycatData(EasyMock.eq(TOPIC), EasyMock.aryEq(RAW_VALUE))).andReturn(record);
- Capture<Collection<SinkRecord>> capturedRecords
- = EasyMock.newCapture(CaptureType.ALL);
- sinkTask.put(EasyMock.capture(capturedRecords));
- EasyMock.expectLastCall();
-
- PowerMock.replayAll();
-
- Whitebox.invokeMethod(workerTask, "deliverMessages", records);
- assertEquals(record.schema(), capturedRecords.getValue().iterator().next().keySchema());
- assertEquals(record.value(), capturedRecords.getValue().iterator().next().key());
- assertEquals(record.schema(), capturedRecords.getValue().iterator().next().valueSchema());
- assertEquals(record.value(), capturedRecords.getValue().iterator().next().value());
-
- PowerMock.verifyAll();
- }
-
- @Test
- public void testCommit() throws Exception {
+ public void testPollRedelivery() throws Exception {
expectInitializeTask();
- // Make each poll() take the offset commit interval
- Capture<Collection<SinkRecord>> capturedRecords
- = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
- expectOffsetFlush(1L, null, null, 0, true);
- expectStopTask(2);
- EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
- PowerMock.replayAll();
-
- workerTask.start(TASK_PROPS);
- workerTask.joinConsumerGroupAndStart();
- // First iteration gets one record
- workerThread.iteration();
- // Second triggers commit, gets a second offset
- workerThread.iteration();
- // Commit finishes synchronously for testing so we can check this immediately
- assertEquals(0, workerThread.commitFailures());
- workerTask.stop();
- workerTask.awaitStop(Long.MAX_VALUE);
- workerTask.close();
- assertEquals(2, capturedRecords.getValues().size());
-
- PowerMock.verifyAll();
- }
-
- @Test
- public void testCommitTaskFlushFailure() throws Exception {
- expectInitializeTask();
- Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
- expectOffsetFlush(1L, new RuntimeException(), null, 0, true);
- // Should rewind to last known good positions, which in this case will be the offsets loaded during initialization
- // for all topic partitions
- consumer.seek(TOPIC_PARTITION, FIRST_OFFSET);
- PowerMock.expectLastCall();
- consumer.seek(TOPIC_PARTITION2, FIRST_OFFSET);
- PowerMock.expectLastCall();
- consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET);
+ // If a retriable exception is thrown, we should redeliver the same batch, pausing the consumer in the meantime
+ expectConsumerPoll(1);
+ expectConvertMessages(1);
+ Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
+ sinkTask.put(EasyMock.capture(records));
+ EasyMock.expectLastCall().andThrow(new RetriableException("retry"));
+ // Pause
+ EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)));
+ consumer.pause(TOPIC_PARTITION);
PowerMock.expectLastCall();
- expectStopTask(2);
- EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
- PowerMock.replayAll();
-
- workerTask.start(TASK_PROPS);
- workerTask.joinConsumerGroupAndStart();
- // Second iteration triggers commit
- workerThread.iteration();
- workerThread.iteration();
- assertEquals(1, workerThread.commitFailures());
- assertEquals(false, Whitebox.getInternalState(workerThread, "committing"));
- workerTask.stop();
- workerTask.awaitStop(Long.MAX_VALUE);
- workerTask.close();
-
- PowerMock.verifyAll();
- }
-
- @Test
- public void testCommitTaskSuccessAndFlushFailure() throws Exception {
- // Validate that we rewind to the correct
-
- expectInitializeTask();
- Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
- expectOffsetFlush(1L, null, null, 0, true);
- expectOffsetFlush(2L, new RuntimeException(), null, 0, true);
- // Should rewind to last known good positions, which in this case will be the offsets last committed. This test
- // isn't quite accurate since we started with assigning 3 topic partitions and then only committed one, but what
- // is important here is that we roll back to the last committed values.
- consumer.seek(TOPIC_PARTITION, FIRST_OFFSET);
+ consumer.pause(TOPIC_PARTITION2);
PowerMock.expectLastCall();
- expectStopTask(2);
- EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
- PowerMock.replayAll();
-
- workerTask.start(TASK_PROPS);
- workerTask.joinConsumerGroupAndStart();
- // Second iteration triggers first commit, third iteration triggers second (failing) commit
- workerThread.iteration();
- workerThread.iteration();
- workerThread.iteration();
- assertEquals(1, workerThread.commitFailures());
- assertEquals(false, Whitebox.getInternalState(workerThread, "committing"));
- workerTask.stop();
- workerTask.awaitStop(Long.MAX_VALUE);
- workerTask.close();
- PowerMock.verifyAll();
- }
-
- @Test
- public void testCommitConsumerFailure() throws Exception {
- expectInitializeTask();
- Capture<Collection<SinkRecord>> capturedRecords
- = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
- expectOffsetFlush(1L, null, new Exception(), 0, true);
- expectStopTask(2);
- EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
- PowerMock.replayAll();
-
- workerTask.start(TASK_PROPS);
- workerTask.joinConsumerGroupAndStart();
- // Second iteration triggers commit
- workerThread.iteration();
- workerThread.iteration();
- // TODO Response to consistent failures?
- assertEquals(1, workerThread.commitFailures());
- assertEquals(false, Whitebox.getInternalState(workerThread, "committing"));
- workerTask.stop();
- workerTask.awaitStop(Long.MAX_VALUE);
- workerTask.close();
-
- PowerMock.verifyAll();
- }
-
- @Test
- public void testCommitTimeout() throws Exception {
- expectInitializeTask();
- // Cut down amount of time to pass in each poll so we trigger exactly 1 offset commit
- Capture<Collection<SinkRecord>> capturedRecords
- = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT / 2);
- expectOffsetFlush(2L, null, null, WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false);
- expectStopTask(4);
- EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
- PowerMock.replayAll();
-
- workerTask.start(TASK_PROPS);
- workerTask.joinConsumerGroupAndStart();
- // Third iteration triggers commit, fourth gives a chance to trigger the timeout but doesn't
- // trigger another commit
- workerThread.iteration();
- workerThread.iteration();
- workerThread.iteration();
- workerThread.iteration();
- // TODO Response to consistent failures?
- assertEquals(1, workerThread.commitFailures());
- assertEquals(false, Whitebox.getInternalState(workerThread, "committing"));
- workerTask.stop();
- workerTask.awaitStop(Long.MAX_VALUE);
- workerTask.close();
-
- PowerMock.verifyAll();
- }
-
- @Test
- public void testAssignmentPauseResume() throws Exception {
- // Just validate that the calls are passed through to the consumer, and that where appropriate errors are
- // converted
- expectInitializeTask();
-
- expectOnePoll().andAnswer(new IAnswer<Object>() {
- @Override
- public Object answer() throws Throwable {
- assertEquals(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)),
- sinkTaskContext.getValue().assignment());
- return null;
- }
- });
- EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)));
-
- expectOnePoll().andAnswer(new IAnswer<Object>() {
- @Override
- public Object answer() throws Throwable {
- try {
- sinkTaskContext.getValue().pause(UNASSIGNED_TOPIC_PARTITION);
- fail("Trying to pause unassigned partition should have thrown an Copycat exception");
- } catch (CopycatException e) {
- // expected
- }
- sinkTaskContext.getValue().pause(TOPIC_PARTITION, TOPIC_PARTITION2);
- return null;
- }
- });
- consumer.pause(UNASSIGNED_TOPIC_PARTITION);
- PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition"));
- consumer.pause(TOPIC_PARTITION, TOPIC_PARTITION2);
+ // Retry delivery should suceed
+ expectConsumerPoll(0);
+ sinkTask.put(EasyMock.capture(records));
+ EasyMock.expectLastCall();
+ // And unpause
+ EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2)));
+ consumer.resume(TOPIC_PARTITION);
PowerMock.expectLastCall();
-
- expectOnePoll().andAnswer(new IAnswer<Object>() {
- @Override
- public Object answer() throws Throwable {
- try {
- sinkTaskContext.getValue().resume(UNASSIGNED_TOPIC_PARTITION);
- fail("Trying to resume unassigned partition should have thrown an Copycat exception");
- } catch (CopycatException e) {
- // expected
- }
-
- sinkTaskContext.getValue().resume(TOPIC_PARTITION, TOPIC_PARTITION2);
- return null;
- }
- });
- consumer.resume(UNASSIGNED_TOPIC_PARTITION);
- PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition"));
- consumer.resume(TOPIC_PARTITION, TOPIC_PARTITION2);
+ consumer.resume(TOPIC_PARTITION2);
PowerMock.expectLastCall();
- expectStopTask(0);
- EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
PowerMock.replayAll();
workerTask.start(TASK_PROPS);
workerTask.joinConsumerGroupAndStart();
- workerThread.iteration();
- workerThread.iteration();
- workerThread.iteration();
- workerTask.stop();
- workerTask.awaitStop(Long.MAX_VALUE);
- workerTask.close();
+ workerTask.poll(Long.MAX_VALUE);
+ workerTask.poll(Long.MAX_VALUE);
PowerMock.verifyAll();
}
- @Test
- public void testRewind() throws Exception {
- expectInitializeTask();
- final long startOffset = 40L;
- final Map<TopicPartition, Long> offsets = new HashMap<>();
-
- expectOnePoll().andAnswer(new IAnswer<Object>() {
- @Override
- public Object answer() throws Throwable {
- offsets.put(TOPIC_PARTITION, startOffset);
- sinkTaskContext.getValue().offset(offsets);
- return null;
- }
- });
-
- consumer.seek(TOPIC_PARTITION, startOffset);
- EasyMock.expectLastCall();
-
- expectOnePoll().andAnswer(new IAnswer<Object>() {
- @Override
- public Object answer() throws Throwable {
- Map<TopicPartition, Long> offsets = sinkTaskContext.getValue().offsets();
- assertEquals(0, offsets.size());
- return null;
- }
- });
-
- expectStopTask(3);
- EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
-
- PowerMock.replayAll();
-
- workerTask.start(TASK_PROPS);
- workerTask.joinConsumerGroupAndStart();
- workerThread.iteration();
- workerThread.iteration();
- workerTask.stop();
- workerTask.awaitStop(Long.MAX_VALUE);
- workerTask.close();
-
- PowerMock.verifyAll();
- }
private void expectInitializeTask() throws Exception {
PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
-
- workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start", "awaitShutdown"},
- workerTask, "mock-worker-thread", time,
- workerConfig);
PowerMock.expectPrivate(workerTask, "createWorkerThread")
.andReturn(workerThread);
workerThread.start();
@@ -469,13 +170,12 @@ public class WorkerSinkTaskTest extends ThreadedTest {
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@Override
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
- rebalanceListener.getValue().onPartitionsAssigned(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3));
+ rebalanceListener.getValue().onPartitionsAssigned(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
return ConsumerRecords.empty();
}
});
EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
- EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET);
sinkTask.initialize(EasyMock.capture(sinkTaskContext));
PowerMock.expectLastCall();
@@ -483,114 +183,26 @@ public class WorkerSinkTaskTest extends ThreadedTest {
PowerMock.expectLastCall();
}
- private void expectStopTask(final long expectedMessages) throws Exception {
- final long finalOffset = FIRST_OFFSET + expectedMessages - 1;
-
- sinkTask.stop();
- PowerMock.expectLastCall();
-
- // No offset commit since it happens in the mocked worker thread, but the main thread does need to wake up the
- // consumer so it exits quickly
- consumer.wakeup();
- PowerMock.expectLastCall();
-
- consumer.close();
- PowerMock.expectLastCall();
- }
-
- // Note that this can only be called once per test currently
- private Capture<Collection<SinkRecord>> expectPolls(final long pollDelayMs) throws Exception {
- // Stub out all the consumer stream/iterator responses, which we just want to verify occur,
- // but don't care about the exact details here.
- EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer(
- new IAnswer<ConsumerRecords<byte[], byte[]>>() {
- @Override
- public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
- // "Sleep" so time will progress
- time.sleep(pollDelayMs);
- ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
- Collections.singletonMap(
- new TopicPartition(TOPIC, PARTITION),
- Arrays.asList(
- new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE)
- )));
- recordsReturned++;
- return records;
- }
- });
- EasyMock.expect(keyConverter.toCopycatData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes();
- EasyMock.expect(valueConverter.toCopycatData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes();
- Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL);
- sinkTask.put(EasyMock.capture(capturedRecords));
- EasyMock.expectLastCall().anyTimes();
- return capturedRecords;
- }
-
- private IExpectationSetters<Object> expectOnePoll() {
- // Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of
- // returning empty data, we return one record. The expectation is that the data will be ignored by the
- // response behavior specified using the return value of this method.
+ private void expectConsumerPoll(final int numMessages) {
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@Override
public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
- // "Sleep" so time will progress
- time.sleep(1L);
- ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
- Collections.singletonMap(
- new TopicPartition(TOPIC, PARTITION),
- Arrays.asList(
- new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE)
- )));
- recordsReturned++;
- return records;
+ List<ConsumerRecord<byte[], byte[]>> records = new ArrayList<>();
+ for (int i = 0; i < numMessages; i++)
+ records.add(new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned + i, RAW_KEY, RAW_VALUE));
+ recordsReturned += numMessages;
+ return new ConsumerRecords<>(
+ numMessages > 0 ?
+ Collections.singletonMap(new TopicPartition(TOPIC, PARTITION), records) :
+ Collections.<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>emptyMap()
+ );
}
});
- EasyMock.expect(keyConverter.toCopycatData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
- EasyMock.expect(valueConverter.toCopycatData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
- sinkTask.put(EasyMock.anyObject(Collection.class));
- return EasyMock.expectLastCall();
}
- private Capture<OffsetCommitCallback> expectOffsetFlush(final long expectedMessages,
- final RuntimeException flushError,
- final Exception consumerCommitError,
- final long consumerCommitDelayMs,
- final boolean invokeCallback)
- throws Exception {
- final long finalOffset = FIRST_OFFSET + expectedMessages - 1;
-
- EasyMock.expect(consumer.assignment()).andReturn(Collections.singleton(TOPIC_PARTITION));
- EasyMock.expect(consumer.position(TOPIC_PARTITION)).andAnswer(
- new IAnswer<Long>() {
- @Override
- public Long answer() throws Throwable {
- return FIRST_OFFSET + recordsReturned - 1;
- }
- }
- );
-
- sinkTask.flush(Collections.singletonMap(TOPIC_PARTITION, new OffsetAndMetadata(finalOffset)));
- IExpectationSetters<Object> flushExpectation = PowerMock.expectLastCall();
- if (flushError != null) {
- flushExpectation.andThrow(flushError).once();
- return null;
- }
-
- final Capture<OffsetCommitCallback> capturedCallback = EasyMock.newCapture();
- final Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(TOPIC_PARTITION, new OffsetAndMetadata(finalOffset));
- consumer.commitAsync(EasyMock.eq(offsets),
- EasyMock.capture(capturedCallback));
- PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
- @Override
- public Object answer() throws Throwable {
- time.sleep(consumerCommitDelayMs);
- if (invokeCallback)
- capturedCallback.getValue().onComplete(offsets, consumerCommitError);
- return null;
- }
- });
- return capturedCallback;
+ private void expectConvertMessages(final int numMessages) {
+ EasyMock.expect(keyConverter.toCopycatData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).times(numMessages);
+ EasyMock.expect(valueConverter.toCopycatData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).times(numMessages);
}
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f4b87dee/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThreadedTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThreadedTest.java
new file mode 100644
index 0000000..ded78a1
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThreadedTest.java
@@ -0,0 +1,563 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetCommitCallback;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.copycat.data.Schema;
+import org.apache.kafka.copycat.data.SchemaAndValue;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.copycat.sink.SinkConnector;
+import org.apache.kafka.copycat.sink.SinkRecord;
+import org.apache.kafka.copycat.sink.SinkTask;
+import org.apache.kafka.copycat.storage.Converter;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.copycat.util.MockTime;
+import org.apache.kafka.copycat.util.ThreadedTest;
+import org.easymock.Capture;
+import org.easymock.CaptureType;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(WorkerSinkTask.class)
+@PowerMockIgnore("javax.management.*")
+public class WorkerSinkTaskThreadedTest extends ThreadedTest {
+
+ // These are fixed to keep this code simpler. In this example we assume byte[] raw values
+ // with mix of integer/string in Copycat
+ private static final String TOPIC = "test";
+ private static final int PARTITION = 12;
+ private static final int PARTITION2 = 13;
+ private static final int PARTITION3 = 14;
+ private static final long FIRST_OFFSET = 45;
+ private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
+ private static final int KEY = 12;
+ private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;
+ private static final String VALUE = "VALUE";
+ private static final byte[] RAW_KEY = "key".getBytes();
+ private static final byte[] RAW_VALUE = "value".getBytes();
+
+ private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
+ private static final TopicPartition TOPIC_PARTITION2 = new TopicPartition(TOPIC, PARTITION2);
+ private static final TopicPartition TOPIC_PARTITION3 = new TopicPartition(TOPIC, PARTITION3);
+ private static final TopicPartition UNASSIGNED_TOPIC_PARTITION = new TopicPartition(TOPIC, 200);
+
+ private static final Map<String, String> TASK_PROPS = new HashMap<>();
+ static {
+ TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
+ }
+
+ private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+ private Time time;
+ @Mock private SinkTask sinkTask;
+ private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
+ private WorkerConfig workerConfig;
+ @Mock private Converter keyConverter;
+ @Mock
+ private Converter valueConverter;
+ private WorkerSinkTask workerTask;
+ @Mock private KafkaConsumer<byte[], byte[]> consumer;
+ private WorkerSinkTaskThread workerThread;
+ private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
+
+ private long recordsReturned;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setup() {
+ super.setup();
+ time = new MockTime();
+ Map<String, String> workerProps = new HashMap<>();
+ workerProps.put("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.put("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.put("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.put("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.put("internal.key.converter.schemas.enable", "false");
+ workerProps.put("internal.value.converter.schemas.enable", "false");
+ workerConfig = new StandaloneConfig(workerProps);
+ workerTask = PowerMock.createPartialMock(
+ WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},
+ taskId, sinkTask, workerConfig, keyConverter, valueConverter, time);
+
+ recordsReturned = 0;
+ }
+
+ @Test
+ public void testPollsInBackground() throws Exception {
+ expectInitializeTask();
+ Capture<Collection<SinkRecord>> capturedRecords = expectPolls(1L);
+ expectStopTask(10L);
+ EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
+
+ PowerMock.replayAll();
+
+ workerTask.start(TASK_PROPS);
+ workerTask.joinConsumerGroupAndStart();
+ for (int i = 0; i < 10; i++) {
+ workerThread.iteration();
+ }
+ workerTask.stop();
+ workerTask.awaitStop(Long.MAX_VALUE);
+ workerTask.close();
+
+ // Verify contents match expected values, i.e. that they were translated properly. With max
+ // batch size 1 and poll returns 1 message at a time, we should have a matching # of batches
+ assertEquals(10, capturedRecords.getValues().size());
+ int offset = 0;
+ for (Collection<SinkRecord> recs : capturedRecords.getValues()) {
+ assertEquals(1, recs.size());
+ for (SinkRecord rec : recs) {
+ SinkRecord referenceSinkRecord
+ = new SinkRecord(TOPIC, PARTITION, KEY_SCHEMA, KEY, VALUE_SCHEMA, VALUE, FIRST_OFFSET + offset);
+ assertEquals(referenceSinkRecord, rec);
+ offset++;
+ }
+ }
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCommit() throws Exception {
+ expectInitializeTask();
+ // Make each poll() take the offset commit interval
+ Capture<Collection<SinkRecord>> capturedRecords
+ = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
+ expectOffsetFlush(1L, null, null, 0, true);
+ expectStopTask(2);
+ EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
+
+ PowerMock.replayAll();
+
+ workerTask.start(TASK_PROPS);
+ workerTask.joinConsumerGroupAndStart();
+ // First iteration gets one record
+ workerThread.iteration();
+ // Second triggers commit, gets a second offset
+ workerThread.iteration();
+ // Commit finishes synchronously for testing so we can check this immediately
+ assertEquals(0, workerThread.commitFailures());
+ workerTask.stop();
+ workerTask.awaitStop(Long.MAX_VALUE);
+ workerTask.close();
+
+ assertEquals(2, capturedRecords.getValues().size());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCommitTaskFlushFailure() throws Exception {
+ expectInitializeTask();
+ Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
+ expectOffsetFlush(1L, new RuntimeException(), null, 0, true);
+ // Should rewind to last known good positions, which in this case will be the offsets loaded during initialization
+ // for all topic partitions
+ consumer.seek(TOPIC_PARTITION, FIRST_OFFSET);
+ PowerMock.expectLastCall();
+ consumer.seek(TOPIC_PARTITION2, FIRST_OFFSET);
+ PowerMock.expectLastCall();
+ consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET);
+ PowerMock.expectLastCall();
+ expectStopTask(2);
+ EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
+
+ PowerMock.replayAll();
+
+ workerTask.start(TASK_PROPS);
+ workerTask.joinConsumerGroupAndStart();
+ // Second iteration triggers commit
+ workerThread.iteration();
+ workerThread.iteration();
+ assertEquals(1, workerThread.commitFailures());
+ assertEquals(false, Whitebox.getInternalState(workerThread, "committing"));
+ workerTask.stop();
+ workerTask.awaitStop(Long.MAX_VALUE);
+ workerTask.close();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCommitTaskSuccessAndFlushFailure() throws Exception {
+ // Validate that we rewind to the correct offsets if a task's flush method throws an exception
+
+ expectInitializeTask();
+ Capture<Collection<SinkRecord>> capturedRecords = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
+ expectOffsetFlush(1L, null, null, 0, true);
+ expectOffsetFlush(2L, new RuntimeException(), null, 0, true);
+ // Should rewind to last known committed positions
+ consumer.seek(TOPIC_PARTITION, FIRST_OFFSET + 1);
+ PowerMock.expectLastCall();
+ consumer.seek(TOPIC_PARTITION2, FIRST_OFFSET);
+ PowerMock.expectLastCall();
+ consumer.seek(TOPIC_PARTITION3, FIRST_OFFSET);
+ PowerMock.expectLastCall();
+ expectStopTask(2);
+ EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
+
+ PowerMock.replayAll();
+
+ workerTask.start(TASK_PROPS);
+ workerTask.joinConsumerGroupAndStart();
+ // Second iteration triggers first commit, third iteration triggers second (failing) commit
+ workerThread.iteration();
+ workerThread.iteration();
+ workerThread.iteration();
+ assertEquals(1, workerThread.commitFailures());
+ assertEquals(false, Whitebox.getInternalState(workerThread, "committing"));
+ workerTask.stop();
+ workerTask.awaitStop(Long.MAX_VALUE);
+ workerTask.close();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCommitConsumerFailure() throws Exception {
+ expectInitializeTask();
+ Capture<Collection<SinkRecord>> capturedRecords
+ = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
+ expectOffsetFlush(1L, null, new Exception(), 0, true);
+ expectStopTask(2);
+ EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
+
+ PowerMock.replayAll();
+
+ workerTask.start(TASK_PROPS);
+ workerTask.joinConsumerGroupAndStart();
+ // Second iteration triggers commit
+ workerThread.iteration();
+ workerThread.iteration();
+ // TODO Response to consistent failures?
+ assertEquals(1, workerThread.commitFailures());
+ assertEquals(false, Whitebox.getInternalState(workerThread, "committing"));
+ workerTask.stop();
+ workerTask.awaitStop(Long.MAX_VALUE);
+ workerTask.close();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCommitTimeout() throws Exception {
+ expectInitializeTask();
+ // Cut down amount of time to pass in each poll so we trigger exactly 1 offset commit
+ Capture<Collection<SinkRecord>> capturedRecords
+ = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT / 2);
+ expectOffsetFlush(2L, null, null, WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false);
+ expectStopTask(4);
+ EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
+
+ PowerMock.replayAll();
+
+ workerTask.start(TASK_PROPS);
+ workerTask.joinConsumerGroupAndStart();
+ // Third iteration triggers commit, fourth gives a chance to trigger the timeout but doesn't
+ // trigger another commit
+ workerThread.iteration();
+ workerThread.iteration();
+ workerThread.iteration();
+ workerThread.iteration();
+ // TODO Response to consistent failures?
+ assertEquals(1, workerThread.commitFailures());
+ assertEquals(false, Whitebox.getInternalState(workerThread, "committing"));
+ workerTask.stop();
+ workerTask.awaitStop(Long.MAX_VALUE);
+ workerTask.close();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testAssignmentPauseResume() throws Exception {
+ // Just validate that the calls are passed through to the consumer, and that where appropriate errors are
+ // converted
+ expectInitializeTask();
+
+ expectOnePoll().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ assertEquals(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)),
+ sinkTaskContext.getValue().assignment());
+ return null;
+ }
+ });
+ EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3)));
+
+ expectOnePoll().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ try {
+ sinkTaskContext.getValue().pause(UNASSIGNED_TOPIC_PARTITION);
+ fail("Trying to pause unassigned partition should have thrown an Copycat exception");
+ } catch (CopycatException e) {
+ // expected
+ }
+ sinkTaskContext.getValue().pause(TOPIC_PARTITION, TOPIC_PARTITION2);
+ return null;
+ }
+ });
+ consumer.pause(UNASSIGNED_TOPIC_PARTITION);
+ PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition"));
+ consumer.pause(TOPIC_PARTITION, TOPIC_PARTITION2);
+ PowerMock.expectLastCall();
+
+ expectOnePoll().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ try {
+ sinkTaskContext.getValue().resume(UNASSIGNED_TOPIC_PARTITION);
+ fail("Trying to resume unassigned partition should have thrown an Copycat exception");
+ } catch (CopycatException e) {
+ // expected
+ }
+
+ sinkTaskContext.getValue().resume(TOPIC_PARTITION, TOPIC_PARTITION2);
+ return null;
+ }
+ });
+ consumer.resume(UNASSIGNED_TOPIC_PARTITION);
+ PowerMock.expectLastCall().andThrow(new IllegalStateException("unassigned topic partition"));
+ consumer.resume(TOPIC_PARTITION, TOPIC_PARTITION2);
+ PowerMock.expectLastCall();
+
+ expectStopTask(0);
+ EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
+
+ PowerMock.replayAll();
+
+ workerTask.start(TASK_PROPS);
+ workerTask.joinConsumerGroupAndStart();
+ workerThread.iteration();
+ workerThread.iteration();
+ workerThread.iteration();
+ workerTask.stop();
+ workerTask.awaitStop(Long.MAX_VALUE);
+ workerTask.close();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testRewind() throws Exception {
+ expectInitializeTask();
+ final long startOffset = 40L;
+ final Map<TopicPartition, Long> offsets = new HashMap<>();
+
+ expectOnePoll().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ offsets.put(TOPIC_PARTITION, startOffset);
+ sinkTaskContext.getValue().offset(offsets);
+ return null;
+ }
+ });
+
+ consumer.seek(TOPIC_PARTITION, startOffset);
+ EasyMock.expectLastCall();
+
+ expectOnePoll().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ Map<TopicPartition, Long> offsets = sinkTaskContext.getValue().offsets();
+ assertEquals(0, offsets.size());
+ return null;
+ }
+ });
+
+ expectStopTask(3);
+ EasyMock.expect(workerThread.awaitShutdown(EasyMock.anyLong(), EasyMock.<TimeUnit>anyObject())).andReturn(true);
+
+ PowerMock.replayAll();
+
+ workerTask.start(TASK_PROPS);
+ workerTask.joinConsumerGroupAndStart();
+ workerThread.iteration();
+ workerThread.iteration();
+ workerTask.stop();
+ workerTask.awaitStop(Long.MAX_VALUE);
+ workerTask.close();
+
+ PowerMock.verifyAll();
+ }
+
+ private void expectInitializeTask() throws Exception {
+ PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
+
+ workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start", "awaitShutdown"},
+ workerTask, "mock-worker-thread", time,
+ workerConfig);
+ PowerMock.expectPrivate(workerTask, "createWorkerThread")
+ .andReturn(workerThread);
+ workerThread.start();
+ PowerMock.expectLastCall();
+
+ consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener));
+ PowerMock.expectLastCall();
+
+ EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(new IAnswer<ConsumerRecords<byte[], byte[]>>() {
+ @Override
+ public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
+ rebalanceListener.getValue().onPartitionsAssigned(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2, TOPIC_PARTITION3));
+ return ConsumerRecords.empty();
+ }
+ });
+ EasyMock.expect(consumer.position(TOPIC_PARTITION)).andReturn(FIRST_OFFSET);
+ EasyMock.expect(consumer.position(TOPIC_PARTITION2)).andReturn(FIRST_OFFSET);
+ EasyMock.expect(consumer.position(TOPIC_PARTITION3)).andReturn(FIRST_OFFSET);
+
+ sinkTask.initialize(EasyMock.capture(sinkTaskContext));
+ PowerMock.expectLastCall();
+ sinkTask.start(TASK_PROPS);
+ PowerMock.expectLastCall();
+ }
+
+ private void expectStopTask(final long expectedMessages) throws Exception {
+ final long finalOffset = FIRST_OFFSET + expectedMessages - 1;
+
+ sinkTask.stop();
+ PowerMock.expectLastCall();
+
+ // No offset commit since it happens in the mocked worker thread, but the main thread does need to wake up the
+ // consumer so it exits quickly
+ consumer.wakeup();
+ PowerMock.expectLastCall();
+
+ consumer.close();
+ PowerMock.expectLastCall();
+ }
+
+ // Note that this can only be called once per test currently
+ private Capture<Collection<SinkRecord>> expectPolls(final long pollDelayMs) throws Exception {
+ // Stub out all the consumer stream/iterator responses, which we just want to verify occur,
+ // but don't care about the exact details here.
+ EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer(
+ new IAnswer<ConsumerRecords<byte[], byte[]>>() {
+ @Override
+ public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
+ // "Sleep" so time will progress
+ time.sleep(pollDelayMs);
+ ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
+ Collections.singletonMap(
+ new TopicPartition(TOPIC, PARTITION),
+ Arrays.asList(
+ new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE)
+ )));
+ recordsReturned++;
+ return records;
+ }
+ });
+ EasyMock.expect(keyConverter.toCopycatData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes();
+ EasyMock.expect(valueConverter.toCopycatData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes();
+ Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL);
+ sinkTask.put(EasyMock.capture(capturedRecords));
+ EasyMock.expectLastCall().anyTimes();
+ return capturedRecords;
+ }
+
+ private IExpectationSetters<Object> expectOnePoll() {
+ // Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of
+ // returning empty data, we return one record. The expectation is that the data will be ignored by the
+ // response behavior specified using the return value of this method.
+ EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
+ new IAnswer<ConsumerRecords<byte[], byte[]>>() {
+ @Override
+ public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
+ // "Sleep" so time will progress
+ time.sleep(1L);
+ ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
+ Collections.singletonMap(
+ new TopicPartition(TOPIC, PARTITION),
+ Arrays.asList(
+ new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE)
+ )));
+ recordsReturned++;
+ return records;
+ }
+ });
+ EasyMock.expect(keyConverter.toCopycatData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
+ EasyMock.expect(valueConverter.toCopycatData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
+ sinkTask.put(EasyMock.anyObject(Collection.class));
+ return EasyMock.expectLastCall();
+ }
+
+ private Capture<OffsetCommitCallback> expectOffsetFlush(final long expectedMessages,
+ final RuntimeException flushError,
+ final Exception consumerCommitError,
+ final long consumerCommitDelayMs,
+ final boolean invokeCallback)
+ throws Exception {
+ final long finalOffset = FIRST_OFFSET + expectedMessages;
+
+ // All assigned partitions will have offsets committed, but we've only processed messages/updated offsets for one
+ final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
+ offsetsToCommit.put(TOPIC_PARTITION, new OffsetAndMetadata(finalOffset));
+ offsetsToCommit.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+ offsetsToCommit.put(TOPIC_PARTITION3, new OffsetAndMetadata(FIRST_OFFSET));
+ sinkTask.flush(offsetsToCommit);
+ IExpectationSetters<Object> flushExpectation = PowerMock.expectLastCall();
+ if (flushError != null) {
+ flushExpectation.andThrow(flushError).once();
+ return null;
+ }
+
+ final Capture<OffsetCommitCallback> capturedCallback = EasyMock.newCapture();
+ consumer.commitAsync(EasyMock.eq(offsetsToCommit),
+ EasyMock.capture(capturedCallback));
+ PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ time.sleep(consumerCommitDelayMs);
+ if (invokeCallback)
+ capturedCallback.getValue().onComplete(offsetsToCommit, consumerCommitError);
+ return null;
+ }
+ });
+ return capturedCallback;
+ }
+
+}