You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2018/04/11 22:34:50 UTC

samza git commit: SAMZA-1183: Fix TestAsyncRunLoop flaky tests

Repository: samza
Updated Branches:
  refs/heads/master 262941516 -> bbcf14eba


SAMZA-1183: Fix TestAsyncRunLoop flaky tests

A. Fix TestProcessInOrder and TestProcessOutOfOrder failures.

Both the tests has two tasks(T1, T2) and the following task to message assignments:
- Task T1: [M1, M2] (Task T1 is expected to process the messages M1 and M2).
- Task T2: [M3] (Task T2 is expected to process the message M3 and stop the runloop).

In some cases, before the task T1 completes processing all it’s messages, T2 gets scheduled and stops the runloop(Stopping all tasks).

We wait in both tests through a CountdownLatch, expecting the tasks to process all the messages, which will never reach zero in the above scenario(there by causing indefinite wait in tests).

Fix: Remove the manual shutdown invocation from Task T2 and use EOS messages to stop the runloop.

B.  Fix TestCommitSingleTask and TestCommitMultipleTask failures.

Both the tests had two tasks(T1, T2).

Task T2 is expected to stop the runloop and task T1 marks the commit flag through taskCoordinator.commit(requestScope).

Failure occurs in some scenarios when the task T2 stops the runloop before the runLoop commits both the tasks.

Fix: Trigger the runloop shutdown sequence after we’ve committed the tasks.

C. Combine the duplicate tests and unify their assertions.

D. Verification: Ran the following script to verify the fixes.

`root88cf6be1c11a:~/samza# i=0`
`root88cf6be1c11a:~/samza# while [ $i -lt 100 ]; do`
       ` i=expr $i + 1`
       ` ./gradlew clean :samza-core:test -Dtest.single="TestAsyncRunLoop" --debug --stacktrace >> test-logs`
  ` done;`

There were no failures.

Author: Shanthoosh Venkataraman <sv...@linkedin.com>

Reviewers: Xinyu Liu <xi...@gmail.com>

Closes #463 from shanthoosh/re_enable_async_run_loop_tests_2


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/bbcf14eb
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/bbcf14eb
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/bbcf14eb

Branch: refs/heads/master
Commit: bbcf14ebac47e4dd43e2a190f33f9810482034cf
Parents: 2629415
Author: Shanthoosh Venkataraman <sv...@linkedin.com>
Authored: Wed Apr 11 15:34:43 2018 -0700
Committer: xiliu <xi...@linkedin.com>
Committed: Wed Apr 11 15:34:43 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/task/TestAsyncRunLoop.java | 324 +++++++++----------
 1 file changed, 146 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/bbcf14eb/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
index d7132f3..88767f5 100644
--- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java
@@ -22,16 +22,15 @@ package org.apache.samza.task;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.samza.Partition;
+import org.apache.samza.checkpoint.Checkpoint;
 import org.apache.samza.checkpoint.OffsetManager;
 import org.apache.samza.config.Config;
 import org.apache.samza.container.SamzaContainerContext;
@@ -46,25 +45,20 @@ import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemConsumers;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.system.TestSystemConsumers;
-import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 
+import org.junit.rules.Timeout;
 import scala.Option;
 import scala.collection.JavaConverters;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.*;
 
-// TODO(spvenkat) SAMZA-1183: Fix all commented out tests.
 public class TestAsyncRunLoop {
   // Immutable objects shared by all test methods.
   private final ExecutorService executor = null;
-  private final SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
   private final SamzaContainerMetrics containerMetrics = new SamzaContainerMetrics("container", new MetricsRegistryMap());
-  private final OffsetManager offsetManager = mock(OffsetManager.class);
   private final long windowMs = -1;
   private final long commitMs = -1;
   private final long callbackTimeoutMs = 0;
@@ -91,10 +85,6 @@ public class TestAsyncRunLoop {
         new scala.collection.immutable.HashSet<String>()), null, null, null);
   }
 
-  TaskInstance createTaskInstance(AsyncStreamTask task, TaskName taskName, SystemStreamPartition ssp) {
-    return createTaskInstance(task, taskName, ssp, offsetManager, consumerMultiplexer);
-  }
-
   interface TestCode {
     void run(TaskCallback callback);
   }
@@ -105,8 +95,6 @@ public class TestAsyncRunLoop {
     private final boolean success;
     private final ExecutorService callbackExecutor = Executors.newFixedThreadPool(4);
 
-    private int processed = 0;
-    private int committed = 0;
     private AtomicInteger completed = new AtomicInteger(0);
     private TestCode callbackHandler = null;
     private TestCode commitHandler = null;
@@ -116,6 +104,8 @@ public class TestAsyncRunLoop {
     private CountDownLatch processedMessagesLatch = null;
 
     private volatile int windowCount = 0;
+    private volatile int processed = 0;
+    private volatile int committed = 0;
 
     private int maxMessagesInFlight;
 
@@ -133,8 +123,7 @@ public class TestAsyncRunLoop {
     }
 
     @Override
-    public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator,
-        TaskCallback callback) {
+    public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator, TaskCallback callback) {
 
       if (maxMessagesInFlight == 1) {
         assertEquals(processed, completed.get());
@@ -198,59 +187,21 @@ public class TestAsyncRunLoop {
     }
   }
 
-  @Before
-  public void setup() {
-    when(consumerMultiplexer.pollIntervalMs()).thenReturn(10);
-  }
+  @Rule
+  public Timeout maxTestDurationInSeconds = Timeout.seconds(120);
 
   @Test
-  public void testMetrics() throws Exception {
-    CountDownLatch task0ProcessedMessages = new CountDownLatch(2);
-    CountDownLatch task1ProcessedMessages = new CountDownLatch(1);
-
-    TestTask task0 = new TestTask(true, true, false, task0ProcessedMessages);
-    TestTask task1 = new TestTask(true, true, false, task1ProcessedMessages);
-    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0);
-    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1);
-
-    Map<TaskName, TaskInstance> tasks = new HashMap<>();
-    tasks.put(taskName0, t0);
-    tasks.put(taskName1, t1);
-    //task0.callbackHandler = buildOutofOrderCallback(task0);
-
-    int maxMessagesInFlight = 1;
-    AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
-        callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
-
-    when(consumerMultiplexer.choose(false))
-        .thenReturn(envelope0)
-        .thenReturn(envelope3)
-        .thenReturn(envelope1)
-        .thenReturn(null)
-        .thenReturn(ssp0EndOfStream)
-        .thenReturn(ssp1EndOfStream)
-        .thenReturn(null);
-
-    runLoop.run();
-
-    task0ProcessedMessages.await();
-    task1ProcessedMessages.await();
-
-    assertEquals(2L, t0.metrics().asyncCallbackCompleted().getCount());
-    assertEquals(1L, t1.metrics().asyncCallbackCompleted().getCount());
-    assertEquals(5L, containerMetrics.envelopes().getCount());
-    assertEquals(3L, containerMetrics.processes().getCount());
-  }
-
-  //@Test
   public void testProcessMultipleTasks() throws Exception {
     CountDownLatch task0ProcessedMessages = new CountDownLatch(1);
     CountDownLatch task1ProcessedMessages = new CountDownLatch(1);
+    SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
+    when(consumerMultiplexer.pollIntervalMs()).thenReturn(10);
+    OffsetManager offsetManager = mock(OffsetManager.class);
 
     TestTask task0 = new TestTask(true, true, false, task0ProcessedMessages);
     TestTask task1 = new TestTask(true, false, true, task1ProcessedMessages);
-    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0);
-    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1);
+    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0, offsetManager, consumerMultiplexer);
+    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1, offsetManager, consumerMultiplexer);
 
     Map<TaskName, TaskInstance> tasks = new HashMap<>();
     tasks.put(taskName0, t0);
@@ -274,15 +225,18 @@ public class TestAsyncRunLoop {
     assertEquals(2L, containerMetrics.processes().getCount());
   }
 
-  //@Test
+  @Test
   public void testProcessInOrder() throws Exception {
     CountDownLatch task0ProcessedMessages = new CountDownLatch(2);
     CountDownLatch task1ProcessedMessages = new CountDownLatch(1);
+    SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
+    when(consumerMultiplexer.pollIntervalMs()).thenReturn(10);
+    OffsetManager offsetManager = mock(OffsetManager.class);
 
     TestTask task0 = new TestTask(true, true, false, task0ProcessedMessages);
-    TestTask task1 = new TestTask(true, false, true, task1ProcessedMessages);
-    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0);
-    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1);
+    TestTask task1 = new TestTask(true, false, false, task1ProcessedMessages);
+    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0, offsetManager, consumerMultiplexer);
+    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1, offsetManager, consumerMultiplexer);
 
     Map<TaskName, TaskInstance> tasks = new HashMap<>();
     tasks.put(taskName0, t0);
@@ -290,9 +244,8 @@ public class TestAsyncRunLoop {
 
     int maxMessagesInFlight = 1;
     AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
-                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
-                                            () -> 0L, false);
-    when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope3).thenReturn(envelope1).thenReturn(null);
+                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
+    when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope3).thenReturn(envelope1).thenReturn(ssp0EndOfStream).thenReturn(ssp1EndOfStream).thenReturn(null);
     runLoop.run();
 
     // Wait till the tasks completes processing all the messages.
@@ -303,8 +256,10 @@ public class TestAsyncRunLoop {
     assertEquals(2, task0.completed.get());
     assertEquals(1, task1.processed);
     assertEquals(1, task1.completed.get());
-    assertEquals(3L, containerMetrics.envelopes().getCount());
+    assertEquals(5L, containerMetrics.envelopes().getCount());
     assertEquals(3L, containerMetrics.processes().getCount());
+    assertEquals(2L, t0.metrics().asyncCallbackCompleted().getCount());
+    assertEquals(1L, t1.metrics().asyncCallbackCompleted().getCount());
   }
 
   private TestCode buildOutofOrderCallback(final TestTask task) {
@@ -313,7 +268,7 @@ public class TestAsyncRunLoop {
       @Override
       public void run(TaskCallback callback) {
         IncomingMessageEnvelope envelope = ((TaskCallbackImpl) callback).envelope;
-        if (envelope == envelope0) {
+        if (envelope.equals(envelope0)) {
           // process first message will wait till the second one is processed
           try {
             latch.await();
@@ -329,28 +284,30 @@ public class TestAsyncRunLoop {
     };
   }
 
-  //@Test
+  @Test
   public void testProcessOutOfOrder() throws Exception {
     int maxMessagesInFlight = 2;
 
     CountDownLatch task0ProcessedMessagesLatch = new CountDownLatch(2);
     CountDownLatch task1ProcessedMessagesLatch = new CountDownLatch(1);
 
-    TestTask task0 = new TestTask(true, true, false, task0ProcessedMessagesLatch,  maxMessagesInFlight);
-    TestTask task1 = new TestTask(true, false, true, task1ProcessedMessagesLatch, maxMessagesInFlight);
-    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0);
-    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1);
+    SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
+    when(consumerMultiplexer.pollIntervalMs()).thenReturn(10);
+    OffsetManager offsetManager = mock(OffsetManager.class);
+
+    TestTask task0 = new TestTask(true, true, false, task0ProcessedMessagesLatch, maxMessagesInFlight);
+    TestTask task1 = new TestTask(true, false, false, task1ProcessedMessagesLatch, maxMessagesInFlight);
+    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0, offsetManager, consumerMultiplexer);
+    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1, offsetManager, consumerMultiplexer);
 
     Map<TaskName, TaskInstance> tasks = new HashMap<>();
     tasks.put(taskName0, t0);
     tasks.put(taskName1, t1);
 
     task0.callbackHandler = buildOutofOrderCallback(task0);
-
     AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
-                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
-                                            () -> 0L, false);
-    when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope3).thenReturn(envelope1).thenReturn(null);
+                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
+    when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope3).thenReturn(envelope1).thenReturn(ssp0EndOfStream).thenReturn(ssp1EndOfStream).thenReturn(null);
     runLoop.run();
 
     task0ProcessedMessagesLatch.await();
@@ -360,16 +317,21 @@ public class TestAsyncRunLoop {
     assertEquals(2, task0.completed.get());
     assertEquals(1, task1.processed);
     assertEquals(1, task1.completed.get());
-    assertEquals(3L, containerMetrics.envelopes().getCount());
+    assertEquals(5L, containerMetrics.envelopes().getCount());
     assertEquals(3L, containerMetrics.processes().getCount());
   }
 
-  //@Test
+  @Test
   public void testWindow() throws Exception {
     TestTask task0 = new TestTask(true, true, false, null);
     TestTask task1 = new TestTask(true, false, true, null);
-    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0);
-    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1);
+
+    SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
+    when(consumerMultiplexer.pollIntervalMs()).thenReturn(10);
+    OffsetManager offsetManager = mock(OffsetManager.class);
+
+    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0, offsetManager, consumerMultiplexer);
+    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1, offsetManager, consumerMultiplexer);
 
     Map<TaskName, TaskInstance> tasks = new HashMap<>();
     tasks.put(taskName0, t0);
@@ -386,18 +348,20 @@ public class TestAsyncRunLoop {
     assertEquals(4, task1.windowCount);
   }
 
-  // TODO fix in SAMZA-1183
-  // @Test
+  @Test
   public void testCommitSingleTask() throws Exception {
     CountDownLatch task0ProcessedMessagesLatch = new CountDownLatch(1);
     CountDownLatch task1ProcessedMessagesLatch = new CountDownLatch(1);
 
+    SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
+    when(consumerMultiplexer.pollIntervalMs()).thenReturn(10);
+    OffsetManager offsetManager = mock(OffsetManager.class);
+
     TestTask task0 = new TestTask(true, true, false, task0ProcessedMessagesLatch);
     task0.setCommitRequest(TaskCoordinator.RequestScope.CURRENT_TASK);
     TestTask task1 = new TestTask(true, false, true, task1ProcessedMessagesLatch);
-    task1.setCommitRequest(TaskCoordinator.RequestScope.CURRENT_TASK);
-    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0);
-    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1);
+    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0, offsetManager, consumerMultiplexer);
+    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1, offsetManager, consumerMultiplexer);
 
     Map<TaskName, TaskInstance> tasks = new HashMap<>();
     tasks.put(taskName0, t0);
@@ -405,72 +369,87 @@ public class TestAsyncRunLoop {
 
     int maxMessagesInFlight = 1;
     AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
-                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
-                                            () -> 0L, false);
+                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
     //have a null message in between to make sure task0 finishes processing and invoke the commit
-    when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(null).thenReturn(envelope1).thenReturn(null);
+    when(consumerMultiplexer.choose(false)).thenReturn(envelope0)
+        .thenAnswer(x -> {
+            task0ProcessedMessagesLatch.await();
+            return null;
+          }).thenReturn(envelope1).thenReturn(null);
+
     runLoop.run();
 
     task0ProcessedMessagesLatch.await();
     task1ProcessedMessagesLatch.await();
 
-    verify(offsetManager).buildCheckpoint(taskName0);
-    verify(offsetManager).writeCheckpoint(taskName0, any());
-    verify(offsetManager, never()).buildCheckpoint(taskName1);
-    verify(offsetManager, never()).writeCheckpoint(taskName1, any());
+    verify(offsetManager).buildCheckpoint(eq(taskName0));
+    verify(offsetManager).writeCheckpoint(eq(taskName0), any(Checkpoint.class));
+    verify(offsetManager, never()).buildCheckpoint(eq(taskName1));
+    verify(offsetManager, never()).writeCheckpoint(eq(taskName1), any(Checkpoint.class));
   }
 
-  //@Test
+  @Test
   public void testCommitAllTasks() throws Exception {
     CountDownLatch task0ProcessedMessagesLatch = new CountDownLatch(1);
     CountDownLatch task1ProcessedMessagesLatch = new CountDownLatch(1);
 
+    SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
+    when(consumerMultiplexer.pollIntervalMs()).thenReturn(10);
+    OffsetManager offsetManager = mock(OffsetManager.class);
+
     TestTask task0 = new TestTask(true, true, false, task0ProcessedMessagesLatch);
     task0.setCommitRequest(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
     TestTask task1 = new TestTask(true, false, true, task1ProcessedMessagesLatch);
-    task1.setCommitRequest(TaskCoordinator.RequestScope.ALL_TASKS_IN_CONTAINER);
-    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0);
-    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1);
+
+    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0, offsetManager, consumerMultiplexer);
+    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1, offsetManager, consumerMultiplexer);
 
     Map<TaskName, TaskInstance> tasks = new HashMap<>();
     tasks.put(taskName0, t0);
     tasks.put(taskName1, t1);
     int maxMessagesInFlight = 1;
-
     AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
-                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
-                                            () -> 0L, false);
+                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
     //have a null message in between to make sure task0 finishes processing and invoke the commit
-    when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(null).thenReturn(envelope1).thenReturn(null);
+    when(consumerMultiplexer.choose(false)).thenReturn(envelope0)
+        .thenAnswer(x -> {
+            task0ProcessedMessagesLatch.await();
+            return null;
+          }).thenReturn(envelope1).thenReturn(null);
     runLoop.run();
 
     task0ProcessedMessagesLatch.await();
     task1ProcessedMessagesLatch.await();
 
-    verify(offsetManager).buildCheckpoint(taskName0);
-    verify(offsetManager).writeCheckpoint(taskName0, any());
-    verify(offsetManager).buildCheckpoint(taskName1);
-    verify(offsetManager).writeCheckpoint(taskName1, any());
+    verify(offsetManager).buildCheckpoint(eq(taskName0));
+    verify(offsetManager).writeCheckpoint(eq(taskName0), any(Checkpoint.class));
+    verify(offsetManager).buildCheckpoint(eq(taskName1));
+    verify(offsetManager).writeCheckpoint(eq(taskName1), any(Checkpoint.class));
   }
 
-  //@Test
+  @Test
   public void testShutdownOnConsensus() throws Exception {
     CountDownLatch task0ProcessedMessagesLatch = new CountDownLatch(1);
     CountDownLatch task1ProcessedMessagesLatch = new CountDownLatch(1);
 
+    SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
+    when(consumerMultiplexer.pollIntervalMs()).thenReturn(10);
+    OffsetManager offsetManager = mock(OffsetManager.class);
+
     TestTask task0 = new TestTask(true, true, true, task0ProcessedMessagesLatch);
     task0.setShutdownRequest(TaskCoordinator.RequestScope.CURRENT_TASK);
     TestTask task1 = new TestTask(true, false, true, task1ProcessedMessagesLatch);
     task1.setShutdownRequest(TaskCoordinator.RequestScope.CURRENT_TASK);
-    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0);
-    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1);
+    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0, offsetManager, consumerMultiplexer);
+    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1, offsetManager, consumerMultiplexer);
 
     Map<TaskName, TaskInstance> tasks = new HashMap<>();
     tasks.put(taskName0, t0);
     tasks.put(taskName1, t1);
 
-    tasks.put(taskName0, createTaskInstance(task0, taskName0, ssp0));
-    tasks.put(taskName1, createTaskInstance(task1, taskName1, ssp1));
+    tasks.put(taskName0, createTaskInstance(task0, taskName0, ssp0, offsetManager, consumerMultiplexer));
+    tasks.put(taskName1, createTaskInstance(task1, taskName1, ssp1, offsetManager, consumerMultiplexer));
+
     int maxMessagesInFlight = 1;
 
     AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
@@ -491,15 +470,19 @@ public class TestAsyncRunLoop {
     assertEquals(2L, containerMetrics.processes().getCount());
   }
 
-  //@Test
+  @Test
   public void testEndOfStreamWithMultipleTasks() throws Exception {
     CountDownLatch task0ProcessedMessagesLatch = new CountDownLatch(1);
     CountDownLatch task1ProcessedMessagesLatch = new CountDownLatch(1);
 
+    SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
+    when(consumerMultiplexer.pollIntervalMs()).thenReturn(10);
+    OffsetManager offsetManager = mock(OffsetManager.class);
+
     TestTask task0 = new TestTask(true, true, false, task0ProcessedMessagesLatch);
     TestTask task1 = new TestTask(true, true, false, task1ProcessedMessagesLatch);
-    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0);
-    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1);
+    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0, offsetManager, consumerMultiplexer);
+    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1, offsetManager, consumerMultiplexer);
 
     Map<TaskName, TaskInstance> tasks = new HashMap<>();
 
@@ -530,17 +513,21 @@ public class TestAsyncRunLoop {
     assertEquals(2L, containerMetrics.processes().getCount());
   }
 
-  //@Test
+  @Test
   public void testEndOfStreamWithOutOfOrderProcess() throws Exception {
     int maxMessagesInFlight = 2;
 
     CountDownLatch task0ProcessedMessagesLatch = new CountDownLatch(2);
     CountDownLatch task1ProcessedMessagesLatch = new CountDownLatch(1);
 
+    SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
+    when(consumerMultiplexer.pollIntervalMs()).thenReturn(10);
+    OffsetManager offsetManager = mock(OffsetManager.class);
+
     TestTask task0 = new TestTask(true, true, false, task0ProcessedMessagesLatch, maxMessagesInFlight);
     TestTask task1 = new TestTask(true, true, false, task1ProcessedMessagesLatch, maxMessagesInFlight);
-    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0);
-    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1);
+    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0, offsetManager, consumerMultiplexer);
+    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1, offsetManager, consumerMultiplexer);
 
     Map<TaskName, TaskInstance> tasks = new HashMap<>();
 
@@ -549,16 +536,8 @@ public class TestAsyncRunLoop {
 
     task0.callbackHandler = buildOutofOrderCallback(task0);
     AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
-                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
-                                            () -> 0L, false);
-    when(consumerMultiplexer.choose(false))
-        .thenReturn(envelope0)
-        .thenReturn(envelope3)
-        .thenReturn(envelope1)
-        .thenReturn(null)
-        .thenReturn(ssp0EndOfStream)
-        .thenReturn(ssp1EndOfStream)
-        .thenReturn(null);
+                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
+    when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope3).thenReturn(envelope1).thenReturn(null).thenReturn(ssp0EndOfStream).thenReturn(ssp1EndOfStream).thenReturn(null);
 
     runLoop.run();
 
@@ -573,48 +552,43 @@ public class TestAsyncRunLoop {
     assertEquals(3L, containerMetrics.processes().getCount());
   }
 
-  //@Test
+  @Test
   public void testEndOfStreamCommitBehavior() throws Exception {
     CountDownLatch task0ProcessedMessagesLatch = new CountDownLatch(1);
     CountDownLatch task1ProcessedMessagesLatch = new CountDownLatch(1);
 
+    SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
+    when(consumerMultiplexer.pollIntervalMs()).thenReturn(10);
+    OffsetManager offsetManager = mock(OffsetManager.class);
+
     //explicitly configure to disable commits inside process or window calls and invoke commit from end of stream
     TestTask task0 = new TestTask(true, false, false, task0ProcessedMessagesLatch);
     TestTask task1 = new TestTask(true, false, false, task1ProcessedMessagesLatch);
 
-    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0);
-    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1);
+    TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0, offsetManager, consumerMultiplexer);
+    TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1, offsetManager, consumerMultiplexer);
 
     Map<TaskName, TaskInstance> tasks = new HashMap<>();
 
     tasks.put(taskName0, t0);
     tasks.put(taskName1, t1);
     int maxMessagesInFlight = 1;
-
-    AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
-                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
-                                            () -> 0L, false);
-
-    when(consumerMultiplexer.choose(false)).thenReturn(envelope0)
-        .thenReturn(envelope1)
-        .thenReturn(null)
-        .thenReturn(ssp0EndOfStream)
-        .thenReturn(ssp1EndOfStream)
-        .thenReturn(null);
+    AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight , windowMs, commitMs,
+                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
+    when(consumerMultiplexer.choose(false)).thenReturn(envelope0).thenReturn(envelope1).thenReturn(null).thenReturn(ssp0EndOfStream).thenReturn(ssp1EndOfStream).thenReturn(null);
 
     runLoop.run();
 
     task0ProcessedMessagesLatch.await();
     task1ProcessedMessagesLatch.await();
 
-    verify(offsetManager).buildCheckpoint(taskName0);
-    verify(offsetManager).writeCheckpoint(taskName0, any());
-    verify(offsetManager).buildCheckpoint(taskName1);
-    verify(offsetManager).writeCheckpoint(taskName1, any());
+    verify(offsetManager).buildCheckpoint(eq(taskName0));
+    verify(offsetManager).writeCheckpoint(eq(taskName0), any(Checkpoint.class));
+    verify(offsetManager).buildCheckpoint(eq(taskName1));
+    verify(offsetManager).writeCheckpoint(eq(taskName1), any(Checkpoint.class));
   }
 
-  // TODO: Add assertions.
-  //@Test
+  @Test
   public void testEndOfStreamOffsetManagement() throws Exception {
     //explicitly configure to disable commits inside process or window calls and invoke commit from end of stream
     TestTask mockStreamTask1 = new TestTask(true, false, false, null);
@@ -644,9 +618,6 @@ public class TestAsyncRunLoop {
 
     TaskName taskName1 = new TaskName("task1");
     TaskName taskName2 = new TaskName("task2");
-    Set<TaskName> taskNames = new HashSet<>();
-    taskNames.add(taskName1);
-    taskNames.add(taskName2);
 
     OffsetManager offsetManager = mock(OffsetManager.class);
 
@@ -667,20 +638,22 @@ public class TestAsyncRunLoop {
 
     int maxMessagesInFlight = 1;
     AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumers, maxMessagesInFlight, windowMs, commitMs,
-                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
-                                            () -> 0L, false);
+                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
 
     runLoop.run();
   }
 
   //@Test
   public void testCommitBehaviourWhenAsyncCommitIsEnabled() throws InterruptedException {
+    SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
+    when(consumerMultiplexer.pollIntervalMs()).thenReturn(10);
+    OffsetManager offsetManager = mock(OffsetManager.class);
+
     int maxMessagesInFlight = 3;
-    TestTask task0 = new TestTask(true, true, false, null,
-                                  maxMessagesInFlight);
+    TestTask task0 = new TestTask(true, true, false, null, maxMessagesInFlight);
     task0.setCommitRequest(TaskCoordinator.RequestScope.CURRENT_TASK);
-    TestTask task1 = new TestTask(true, false, true, null);
-    task1.setCommitRequest(TaskCoordinator.RequestScope.CURRENT_TASK);
+    TestTask task1 = new TestTask(true, false, false, null, maxMessagesInFlight);
+
     IncomingMessageEnvelope firstMsg = new IncomingMessageEnvelope(ssp0, "0", "key0", "value0");
     IncomingMessageEnvelope secondMsg = new IncomingMessageEnvelope(ssp0, "1", "key1", "value1");
     IncomingMessageEnvelope thirdMsg = new IncomingMessageEnvelope(ssp0, "2", "key0", "value0");
@@ -698,9 +671,7 @@ public class TestAsyncRunLoop {
         } else if (envelope.equals(thirdMsg)) {
           secondMsgCompletionLatch.countDown();
           // OffsetManager.update with firstMsg offset, task.commit has happened when second message callback has not completed.
-          verify(offsetManager).update(taskName0, firstMsg.getSystemStreamPartition(), firstMsg.getOffset());
-          verify(offsetManager, atLeastOnce()).buildCheckpoint(taskName0);
-          verify(offsetManager, atLeastOnce()).writeCheckpoint(taskName0, any());
+          verify(offsetManager).update(eq(taskName0), eq(firstMsg.getSystemStreamPartition()), eq(firstMsg.getOffset()));
         }
       } catch (Exception e) {
         e.printStackTrace();
@@ -709,34 +680,35 @@ public class TestAsyncRunLoop {
 
     Map<TaskName, TaskInstance> tasks = new HashMap<>();
 
-    tasks.put(taskName0, createTaskInstance(task0, taskName0, ssp0));
-    tasks.put(taskName1, createTaskInstance(task1, taskName1, ssp1));
-    when(consumerMultiplexer.choose(false)).thenReturn(firstMsg)
-                                           .thenReturn(secondMsg)
-                                           .thenReturn(thirdMsg)
-                                           .thenReturn(envelope1)
-                                           .thenReturn(null);
+    tasks.put(taskName0, createTaskInstance(task0, taskName0, ssp0, offsetManager, consumerMultiplexer));
+    tasks.put(taskName1, createTaskInstance(task1, taskName1, ssp1, offsetManager, consumerMultiplexer));
+    when(consumerMultiplexer.choose(false)).thenReturn(firstMsg).thenReturn(secondMsg).thenReturn(thirdMsg).thenReturn(envelope1).thenReturn(ssp0EndOfStream).thenReturn(ssp1EndOfStream).thenReturn(null);
 
     AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
-                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
-                                            () -> 0L, false);
+                                            callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, () -> 0L, false);
 
     runLoop.run();
 
     firstMsgCompletionLatch.await();
     secondMsgCompletionLatch.await();
 
+    verify(offsetManager, atLeastOnce()).buildCheckpoint(eq(taskName0));
+    verify(offsetManager, atLeastOnce()).writeCheckpoint(eq(taskName0), any(Checkpoint.class));
     assertEquals(3, task0.processed);
     assertEquals(3, task0.committed);
     assertEquals(1, task1.processed);
     assertEquals(0, task1.committed);
   }
 
-  //@Test
+  @Test
   public void testProcessBehaviourWhenAsyncCommitIsEnabled() throws InterruptedException {
     int maxMessagesInFlight = 2;
+
+    SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
+    when(consumerMultiplexer.pollIntervalMs()).thenReturn(10);
+    OffsetManager offsetManager = mock(OffsetManager.class);
+
     TestTask task0 = new TestTask(true, true, false, null, maxMessagesInFlight);
-    TestTask task1 = new TestTask(true, false, true, null);
     CountDownLatch commitLatch = new CountDownLatch(1);
     task0.commitHandler = callback -> {
       TaskCallbackImpl taskCallback = (TaskCallbackImpl) callback;
@@ -761,12 +733,8 @@ public class TestAsyncRunLoop {
 
     Map<TaskName, TaskInstance> tasks = new HashMap<>();
 
-    tasks.put(taskName0, createTaskInstance(task0, taskName0, ssp0));
-    tasks.put(taskName1, createTaskInstance(task1, taskName1, ssp1));
-    when(consumerMultiplexer.choose(false)).thenReturn(envelope3)
-                                           .thenReturn(envelope0)
-                                           .thenReturn(envelope1)
-                                           .thenReturn(null);
+    tasks.put(taskName0, createTaskInstance(task0, taskName0, ssp0, offsetManager, consumerMultiplexer));
+    when(consumerMultiplexer.choose(false)).thenReturn(envelope3).thenReturn(envelope0).thenReturn(ssp0EndOfStream).thenReturn(null);
     AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
                                             callbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics,
                                             () -> 0L, true);