You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/06/21 03:43:43 UTC

[2/2] kafka git commit: KAFKA-3865: Fix transient failure in WorkerSourceTaskTest.testSlowTaskStart

KAFKA-3865: Fix transient failure in WorkerSourceTaskTest.testSlowTaskStart

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Liquan Pei <li...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1531 from hachikuji/KAFKA-3865


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/37244881
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/37244881
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/37244881

Branch: refs/heads/0.10.0
Commit: 37244881efefd593169b15f702b74ab17a4394a7
Parents: 280f09b
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Jun 20 20:32:09 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Jun 20 20:40:15 2016 -0700

----------------------------------------------------------------------
 .../connect/runtime/WorkerSourceTaskTest.java   | 79 +++++++++++++-------
 1 file changed, 53 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/37244881/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 0761245..ab9863c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -21,9 +21,7 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -53,7 +51,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -68,7 +65,6 @@ import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
 public class WorkerSourceTaskTest extends ThreadedTest {
-    private final Random random = new Random();
     private static final String TOPIC = "topic";
     private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes());
     private static final Map<String, Integer> OFFSET = Collections.singletonMap("key", 12);
@@ -163,11 +159,14 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
-        executor.submit(workerTask);
+        Future<?> taskFuture = executor.submit(workerTask);
+
         assertTrue(startupLatch.await(5, TimeUnit.SECONDS));
         workerTask.stop();
         assertTrue(workerTask.awaitStop(1000));
 
+        taskFuture.get();
+
         PowerMock.verifyAll();
     }
 
@@ -199,8 +198,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
-        executor.submit(workerTask);
-        awaitLatch(pollLatch);
+        Future<?> taskFuture = executor.submit(workerTask);
+        assertTrue(awaitLatch(pollLatch));
 
         workerTask.transitionTo(TargetState.PAUSED);
 
@@ -213,6 +212,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         workerTask.stop();
         assertTrue(workerTask.awaitStop(1000));
 
+        taskFuture.get();
+
         PowerMock.verifyAll();
     }
 
@@ -240,11 +241,14 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
-        executor.submit(workerTask);
-        awaitLatch(pollLatch);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
         workerTask.stop();
         assertTrue(workerTask.awaitStop(1000));
 
+        taskFuture.get();
+
         PowerMock.verifyAll();
     }
 
@@ -259,9 +263,15 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         statusListener.onStartup(taskId);
         EasyMock.expectLastCall();
 
-        final CountDownLatch pollLatch = expectPolls(1);
-        RuntimeException exception = new RuntimeException();
-        EasyMock.expect(sourceTask.poll()).andThrow(exception);
+        final CountDownLatch pollLatch = new CountDownLatch(1);
+        final RuntimeException exception = new RuntimeException();
+        EasyMock.expect(sourceTask.poll()).andAnswer(new IAnswer<List<SourceRecord>>() {
+            @Override
+            public List<SourceRecord> answer() throws Throwable {
+                pollLatch.countDown();
+                throw exception;
+            }
+        });
 
         statusListener.onFailure(taskId, exception);
         EasyMock.expectLastCall();
@@ -273,11 +283,14 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
-        executor.submit(workerTask);
-        awaitLatch(pollLatch);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
         workerTask.stop();
         assertTrue(workerTask.awaitStop(1000));
 
+        taskFuture.get();
+
         PowerMock.verifyAll();
     }
 
@@ -308,12 +321,15 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
-        executor.submit(workerTask);
-        awaitLatch(pollLatch);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
         assertTrue(workerTask.commitOffsets());
         workerTask.stop();
         assertTrue(workerTask.awaitStop(1000));
 
+        taskFuture.get();
+
         PowerMock.verifyAll();
     }
 
@@ -343,12 +359,15 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
-        executor.submit(workerTask);
-        awaitLatch(pollLatch);
+        Future<?> taskFuture = executor.submit(workerTask);
+
+        assertTrue(awaitLatch(pollLatch));
         assertTrue(workerTask.commitOffsets());
         workerTask.stop();
         assertTrue(workerTask.awaitStop(1000));
 
+        taskFuture.get();
+
         PowerMock.verifyAll();
     }
     
@@ -433,39 +452,47 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     @Test
     public void testSlowTaskStart() throws Exception {
         final CountDownLatch startupLatch = new CountDownLatch(1);
+        final CountDownLatch finishStartupLatch = new CountDownLatch(1);
 
         createWorkerTask();
 
         sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
         EasyMock.expectLastCall();
         sourceTask.start(TASK_PROPS);
-        EasyMock.expectLastCall();
-
-        statusListener.onStartup(taskId);
         EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
             @Override
             public Object answer() throws Throwable {
                 startupLatch.countDown();
-                Utils.sleep(100);
+                assertTrue(awaitLatch(finishStartupLatch));
                 return null;
             }
         });
 
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
         sourceTask.stop();
         EasyMock.expectLastCall();
         expectOffsetFlush(true);
 
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
-        executor.submit(workerTask);
+        Future<?> workerTaskFuture = executor.submit(workerTask);
+
         // Stopping immediately while the other thread has work to do should result in no polling, no offset commits,
         // exiting the work thread immediately, and the stop() method will be invoked in the background thread since it
         // cannot be invoked immediately in the thread trying to stop the task.
-        awaitLatch(startupLatch);
+        assertTrue(awaitLatch(startupLatch));
         workerTask.stop();
+        finishStartupLatch.countDown();
         assertTrue(workerTask.awaitStop(1000));
 
+        workerTaskFuture.get();
+
         PowerMock.verifyAll();
     }
 
@@ -580,7 +607,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class));
         IExpectationSetters<Void> expect = EasyMock.expectLastCall();
         if (!succeed) {
-            expect = expect.andThrow(new InterruptException("Error committing record in source task"));
+            expect = expect.andThrow(new RuntimeException("Error committing record in source task"));
         }
         if (anyTimes) {
             expect.anyTimes();
@@ -589,7 +616,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
     private boolean awaitLatch(CountDownLatch latch) {
         try {
-            return latch.await(1000, TimeUnit.MILLISECONDS);
+            return latch.await(5000, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
             // ignore
         }