You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/06/21 03:43:43 UTC
[2/2] kafka git commit: KAFKA-3865: Fix transient failure in
WorkerSourceTaskTest.testSlowTaskStart
KAFKA-3865: Fix transient failure in WorkerSourceTaskTest.testSlowTaskStart
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Liquan Pei <li...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1531 from hachikuji/KAFKA-3865
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/37244881
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/37244881
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/37244881
Branch: refs/heads/0.10.0
Commit: 37244881efefd593169b15f702b74ab17a4394a7
Parents: 280f09b
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Jun 20 20:32:09 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Jun 20 20:40:15 2016 -0700
----------------------------------------------------------------------
.../connect/runtime/WorkerSourceTaskTest.java | 79 +++++++++++++-------
1 file changed, 53 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/37244881/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 0761245..ab9863c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -21,9 +21,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord;
@@ -53,7 +51,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -68,7 +65,6 @@ import static org.junit.Assert.assertTrue;
@RunWith(PowerMockRunner.class)
public class WorkerSourceTaskTest extends ThreadedTest {
- private final Random random = new Random();
private static final String TOPIC = "topic";
private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes());
private static final Map<String, Integer> OFFSET = Collections.singletonMap("key", 12);
@@ -163,11 +159,14 @@ public class WorkerSourceTaskTest extends ThreadedTest {
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
- executor.submit(workerTask);
+ Future<?> taskFuture = executor.submit(workerTask);
+
assertTrue(startupLatch.await(5, TimeUnit.SECONDS));
workerTask.stop();
assertTrue(workerTask.awaitStop(1000));
+ taskFuture.get();
+
PowerMock.verifyAll();
}
@@ -199,8 +198,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
- executor.submit(workerTask);
- awaitLatch(pollLatch);
+ Future<?> taskFuture = executor.submit(workerTask);
+ assertTrue(awaitLatch(pollLatch));
workerTask.transitionTo(TargetState.PAUSED);
@@ -213,6 +212,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
workerTask.stop();
assertTrue(workerTask.awaitStop(1000));
+ taskFuture.get();
+
PowerMock.verifyAll();
}
@@ -240,11 +241,14 @@ public class WorkerSourceTaskTest extends ThreadedTest {
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
- executor.submit(workerTask);
- awaitLatch(pollLatch);
+ Future<?> taskFuture = executor.submit(workerTask);
+
+ assertTrue(awaitLatch(pollLatch));
workerTask.stop();
assertTrue(workerTask.awaitStop(1000));
+ taskFuture.get();
+
PowerMock.verifyAll();
}
@@ -259,9 +263,15 @@ public class WorkerSourceTaskTest extends ThreadedTest {
statusListener.onStartup(taskId);
EasyMock.expectLastCall();
- final CountDownLatch pollLatch = expectPolls(1);
- RuntimeException exception = new RuntimeException();
- EasyMock.expect(sourceTask.poll()).andThrow(exception);
+ final CountDownLatch pollLatch = new CountDownLatch(1);
+ final RuntimeException exception = new RuntimeException();
+ EasyMock.expect(sourceTask.poll()).andAnswer(new IAnswer<List<SourceRecord>>() {
+ @Override
+ public List<SourceRecord> answer() throws Throwable {
+ pollLatch.countDown();
+ throw exception;
+ }
+ });
statusListener.onFailure(taskId, exception);
EasyMock.expectLastCall();
@@ -273,11 +283,14 @@ public class WorkerSourceTaskTest extends ThreadedTest {
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
- executor.submit(workerTask);
- awaitLatch(pollLatch);
+ Future<?> taskFuture = executor.submit(workerTask);
+
+ assertTrue(awaitLatch(pollLatch));
workerTask.stop();
assertTrue(workerTask.awaitStop(1000));
+ taskFuture.get();
+
PowerMock.verifyAll();
}
@@ -308,12 +321,15 @@ public class WorkerSourceTaskTest extends ThreadedTest {
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
- executor.submit(workerTask);
- awaitLatch(pollLatch);
+ Future<?> taskFuture = executor.submit(workerTask);
+
+ assertTrue(awaitLatch(pollLatch));
assertTrue(workerTask.commitOffsets());
workerTask.stop();
assertTrue(workerTask.awaitStop(1000));
+ taskFuture.get();
+
PowerMock.verifyAll();
}
@@ -343,12 +359,15 @@ public class WorkerSourceTaskTest extends ThreadedTest {
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
- executor.submit(workerTask);
- awaitLatch(pollLatch);
+ Future<?> taskFuture = executor.submit(workerTask);
+
+ assertTrue(awaitLatch(pollLatch));
assertTrue(workerTask.commitOffsets());
workerTask.stop();
assertTrue(workerTask.awaitStop(1000));
+ taskFuture.get();
+
PowerMock.verifyAll();
}
@@ -433,39 +452,47 @@ public class WorkerSourceTaskTest extends ThreadedTest {
@Test
public void testSlowTaskStart() throws Exception {
final CountDownLatch startupLatch = new CountDownLatch(1);
+ final CountDownLatch finishStartupLatch = new CountDownLatch(1);
createWorkerTask();
sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
EasyMock.expectLastCall();
sourceTask.start(TASK_PROPS);
- EasyMock.expectLastCall();
-
- statusListener.onStartup(taskId);
EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
startupLatch.countDown();
- Utils.sleep(100);
+ assertTrue(awaitLatch(finishStartupLatch));
return null;
}
});
+ statusListener.onStartup(taskId);
+ EasyMock.expectLastCall();
+
sourceTask.stop();
EasyMock.expectLastCall();
expectOffsetFlush(true);
+ statusListener.onShutdown(taskId);
+ EasyMock.expectLastCall();
+
PowerMock.replayAll();
workerTask.initialize(TASK_CONFIG);
- executor.submit(workerTask);
+ Future<?> workerTaskFuture = executor.submit(workerTask);
+
// Stopping immediately while the other thread has work to do should result in no polling, no offset commits,
// exiting the work thread immediately, and the stop() method will be invoked in the background thread since it
// cannot be invoked immediately in the thread trying to stop the task.
- awaitLatch(startupLatch);
+ assertTrue(awaitLatch(startupLatch));
workerTask.stop();
+ finishStartupLatch.countDown();
assertTrue(workerTask.awaitStop(1000));
+ workerTaskFuture.get();
+
PowerMock.verifyAll();
}
@@ -580,7 +607,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class));
IExpectationSetters<Void> expect = EasyMock.expectLastCall();
if (!succeed) {
- expect = expect.andThrow(new InterruptException("Error committing record in source task"));
+ expect = expect.andThrow(new RuntimeException("Error committing record in source task"));
}
if (anyTimes) {
expect.anyTimes();
@@ -589,7 +616,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
private boolean awaitLatch(CountDownLatch latch) {
try {
- return latch.await(1000, TimeUnit.MILLISECONDS);
+ return latch.await(5000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// ignore
}