You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/03/15 20:44:08 UTC

[kafka] branch 1.1 updated: KAFKA-6106: Postpone normal processing of tasks within a thread until restoration of all tasks have completed. (#4651)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new 4cb883e  KAFKA-6106: Postpone normal processing of tasks within a thread until restoration of all tasks have completed. (#4651)
4cb883e is described below

commit 4cb883ea21d511df2c0bf3a2755b85cd6f4aeb26
Author: Kamal C <ka...@gmail.com>
AuthorDate: Fri Mar 16 01:32:28 2018 +0530

    KAFKA-6106: Postpone normal processing of tasks within a thread until restoration of all tasks have completed. (#4651)
    
    Author:  Kamal Chandraprakash <ka...@gmail.com>
    
    Reviewer: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
 .../streams/processor/internals/AssignedTasks.java | 35 ++++------------
 .../streams/processor/internals/StreamThread.java  |  4 +-
 .../streams/processor/internals/TaskManager.java   | 17 ++++----
 .../internals/AssignedStreamsTasksTest.java        | 45 +++-----------------
 .../processor/internals/StreamThreadTest.java      | 40 +++++++++++++++---
 .../processor/internals/TaskManagerTest.java       | 48 ++++++++--------------
 6 files changed, 73 insertions(+), 116 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 029f745..da402bc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -73,27 +73,12 @@ abstract class AssignedTasks<T extends Task> {
         created.put(task.id(), task);
     }
 
-    Set<TopicPartition> uninitializedPartitions() {
-        if (created.isEmpty()) {
-            return Collections.emptySet();
-        }
-        final Set<TopicPartition> partitions = new HashSet<>();
-        for (final Map.Entry<TaskId, T> entry : created.entrySet()) {
-            if (entry.getValue().hasStateStores()) {
-                partitions.addAll(entry.getValue().partitions());
-            }
-        }
-        return partitions;
-    }
-
     /**
-     * @return partitions that are ready to be resumed
      * @throws IllegalStateException If store gets registered after initialized is already finished
      * @throws StreamsException if the store's change log does not contain the partition
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    Set<TopicPartition> initializeNewTasks() {
-        final Set<TopicPartition> readyPartitions = new HashSet<>();
+    void initializeNewTasks() {
         if (!created.isEmpty()) {
             log.debug("Initializing {}s {}", taskTypeName, created.keySet());
         }
@@ -104,7 +89,7 @@ abstract class AssignedTasks<T extends Task> {
                     log.debug("Transitioning {} {} to restoring", taskTypeName, entry.getKey());
                     addToRestoring(entry.getValue());
                 } else {
-                    transitionToRunning(entry.getValue(), readyPartitions);
+                    transitionToRunning(entry.getValue());
                 }
                 it.remove();
             } catch (final LockException e) {
@@ -112,21 +97,19 @@ abstract class AssignedTasks<T extends Task> {
                 log.trace("Could not create {} {} due to {}; will retry", taskTypeName, entry.getKey(), e.getMessage());
             }
         }
-        return readyPartitions;
     }
 
-    Set<TopicPartition> updateRestored(final Collection<TopicPartition> restored) {
+    void updateRestored(final Collection<TopicPartition> restored) {
         if (restored.isEmpty()) {
-            return Collections.emptySet();
+            return;
         }
         log.trace("{} changelog partitions that have completed restoring so far: {}", taskTypeName, restored);
-        final Set<TopicPartition> resume = new HashSet<>();
         restoredPartitions.addAll(restored);
         for (final Iterator<Map.Entry<TaskId, T>> it = restoring.entrySet().iterator(); it.hasNext(); ) {
             final Map.Entry<TaskId, T> entry = it.next();
             final T task = entry.getValue();
             if (restoredPartitions.containsAll(task.changelogPartitions())) {
-                transitionToRunning(task, resume);
+                transitionToRunning(task);
                 it.remove();
                 log.trace("{} {} completed restoration as all its changelog partitions {} have been applied to restore state",
                         taskTypeName,
@@ -146,7 +129,6 @@ abstract class AssignedTasks<T extends Task> {
         if (allTasksRunning()) {
             restoredPartitions.clear();
         }
-        return resume;
     }
 
     boolean allTasksRunning() {
@@ -242,7 +224,7 @@ abstract class AssignedTasks<T extends Task> {
                 suspended.remove(taskId);
                 task.resume();
                 try {
-                    transitionToRunning(task, new HashSet<TopicPartition>());
+                    transitionToRunning(task);
                 } catch (final TaskMigratedException e) {
                     // we need to catch migration exception internally since this function
                     // is triggered in the rebalance callback
@@ -278,15 +260,12 @@ abstract class AssignedTasks<T extends Task> {
     /**
      * @throws TaskMigratedException if the task producer got fenced (EOS only)
      */
-    private void transitionToRunning(final T task, final Set<TopicPartition> readyPartitions) {
+    private void transitionToRunning(final T task) {
         log.debug("transitioning {} {} to running", taskTypeName, task.id());
         running.put(task.id(), task);
         task.initializeTopology();
         for (TopicPartition topicPartition : task.partitions()) {
             runningByPartition.put(topicPartition, task);
-            if (task.hasStateStores()) {
-                readyPartitions.add(topicPartition);
-            }
         }
         for (TopicPartition topicPartition : task.changelogPartitions()) {
             runningByPartition.put(topicPartition, task);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 2937fc7..fb9b8e0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -391,7 +391,7 @@ public class StreamThread extends Thread {
                   taskCreatedSensor,
                   storeChangelogReader,
                   time,
-                    log);
+                  log);
             this.cache = cache;
             this.clientSupplier = clientSupplier;
             this.threadProducer = threadProducer;
@@ -457,7 +457,7 @@ public class StreamThread extends Thread {
                   taskCreatedSensor,
                   storeChangelogReader,
                   time,
-                    log);
+                  log);
         }
 
         @Override
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 72d7679..66c1171 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -106,9 +106,9 @@ class TaskManager {
         active.closeNonAssignedSuspendedTasks(assignedActiveTasks);
         addStreamTasks(assignment);
         addStandbyTasks();
-        final Set<TopicPartition> partitions = active.uninitializedPartitions();
-        log.trace("Pausing partitions: {}", partitions);
-        consumer.pause(partitions);
+        // Pause all the partitions until the underlying state store is ready for all the active tasks.
+        log.trace("Pausing partitions: {}", assignment);
+        consumer.pause(assignment);
     }
 
     private void addStreamTasks(final Collection<TopicPartition> assignment) {
@@ -313,18 +313,17 @@ class TaskManager {
      * @throws TaskMigratedException if the task producer got fenced or consumer discovered changelog offset changes (EOS only)
      */
     boolean updateNewAndRestoringTasks() {
-        final Set<TopicPartition> resumed = active.initializeNewTasks();
+        active.initializeNewTasks();
         standby.initializeNewTasks();
 
         final Collection<TopicPartition> restored = changelogReader.restore(active);
 
-        resumed.addAll(active.updateRestored(restored));
+        active.updateRestored(restored);
 
-        if (!resumed.isEmpty()) {
-            log.trace("Resuming partitions {}", resumed);
-            consumer.resume(resumed);
-        }
         if (active.allTasksRunning()) {
+            Set<TopicPartition> assignment = consumer.assignment();
+            log.trace("Resuming partitions {}", assignment);
+            consumer.resume(assignment);
             assignStandbyPartitions();
             return true;
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index 246d047..fcd2322 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -58,36 +58,6 @@ public class AssignedStreamsTasksTest {
     }
 
     @Test
-    public void shouldGetPartitionsFromNewTasksThatHaveStateStores() {
-        EasyMock.expect(t1.hasStateStores()).andReturn(true);
-        EasyMock.expect(t2.hasStateStores()).andReturn(true);
-        EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
-        EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
-        EasyMock.replay(t1, t2);
-
-        assignedTasks.addNewTask(t1);
-        assignedTasks.addNewTask(t2);
-
-        final Set<TopicPartition> partitions = assignedTasks.uninitializedPartitions();
-        assertThat(partitions, equalTo(Utils.mkSet(tp1, tp2)));
-        EasyMock.verify(t1, t2);
-    }
-
-    @Test
-    public void shouldNotGetPartitionsFromNewTasksWithoutStateStores() {
-        EasyMock.expect(t1.hasStateStores()).andReturn(false);
-        EasyMock.expect(t2.hasStateStores()).andReturn(false);
-        EasyMock.replay(t1, t2);
-
-        assignedTasks.addNewTask(t1);
-        assignedTasks.addNewTask(t2);
-
-        final Set<TopicPartition> partitions = assignedTasks.uninitializedPartitions();
-        assertTrue(partitions.isEmpty());
-        EasyMock.verify(t1, t2);
-    }
-
-    @Test
     public void shouldInitializeNewTasks() {
         EasyMock.expect(t1.initializeStateStores()).andReturn(false);
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
@@ -112,19 +82,17 @@ public class AssignedStreamsTasksTest {
         final Set<TopicPartition> t2partitions = Collections.singleton(tp2);
         EasyMock.expect(t2.partitions()).andReturn(t2partitions);
         EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
-        EasyMock.expect(t2.hasStateStores()).andReturn(true);
 
         EasyMock.replay(t1, t2);
 
         assignedTasks.addNewTask(t1);
         assignedTasks.addNewTask(t2);
 
-        final Set<TopicPartition> readyPartitions = assignedTasks.initializeNewTasks();
+        assignedTasks.initializeNewTasks();
 
         Collection<StreamTask> restoring = assignedTasks.restoringTasks();
         assertThat(restoring.size(), equalTo(1));
         assertSame(restoring.iterator().next(), t1);
-        assertThat(readyPartitions, equalTo(t2partitions));
     }
 
     @Test
@@ -134,15 +102,13 @@ public class AssignedStreamsTasksTest {
         EasyMock.expectLastCall().once();
         EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
         EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
-        EasyMock.expect(t2.hasStateStores()).andReturn(false);
 
         EasyMock.replay(t2);
 
         assignedTasks.addNewTask(t2);
-        final Set<TopicPartition> toResume = assignedTasks.initializeNewTasks();
+        assignedTasks.initializeNewTasks();
 
         assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId2)));
-        assertThat(toResume, equalTo(Collections.<TopicPartition>emptySet()));
     }
 
     @Test
@@ -158,9 +124,9 @@ public class AssignedStreamsTasksTest {
 
         addAndInitTask();
 
-        assertTrue(assignedTasks.updateRestored(Utils.mkSet(changeLog1)).isEmpty());
-        Set<TopicPartition> partitions = assignedTasks.updateRestored(Utils.mkSet(changeLog2));
-        assertThat(partitions, equalTo(task1Partitions));
+        assignedTasks.updateRestored(Utils.mkSet(changeLog1));
+        assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.<TaskId>emptySet()));
+        assignedTasks.updateRestored(Utils.mkSet(changeLog2));
         assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1)));
     }
 
@@ -282,7 +248,6 @@ public class AssignedStreamsTasksTest {
         EasyMock.expectLastCall().once();
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
         EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
-        EasyMock.expect(t1.hasStateStores()).andReturn(false);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 6529be7..17159d9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -148,6 +148,10 @@ public class StreamThreadTest {
         // assign single partition
         assignedPartitions = Collections.singletonList(t1p1);
         thread.taskManager().setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), Collections.<TaskId, Set<TopicPartition>>emptyMap());
+
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(assignedPartitions);
+        mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
         rebalanceListener.onPartitionsAssigned(assignedPartitions);
         thread.runOnce(-1);
         assertEquals(thread.state(), StreamThread.State.RUNNING);
@@ -368,8 +372,13 @@ public class StreamThreadTest {
         activeTasks.put(task2, Collections.singleton(t1p2));
 
         thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
-        thread.taskManager().createTasks(assignedPartitions);
 
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(assignedPartitions);
+        Map<TopicPartition, Long> beginOffsets = new HashMap<>();
+        beginOffsets.put(t1p1, 0L);
+        beginOffsets.put(t1p2, 0L);
+        mockConsumer.updateBeginningOffsets(beginOffsets);
         thread.rebalanceListener.onPartitionsAssigned(new HashSet<>(assignedPartitions));
 
         assertEquals(1, clientSupplier.producers.size());
@@ -401,6 +410,12 @@ public class StreamThreadTest {
 
         thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
 
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(assignedPartitions);
+        Map<TopicPartition, Long> beginOffsets = new HashMap<>();
+        beginOffsets.put(t1p1, 0L);
+        beginOffsets.put(t1p2, 0L);
+        mockConsumer.updateBeginningOffsets(beginOffsets);
         thread.rebalanceListener.onPartitionsAssigned(new HashSet<>(assignedPartitions));
 
         thread.runOnce(-1);
@@ -429,7 +444,12 @@ public class StreamThreadTest {
         activeTasks.put(task2, Collections.singleton(t1p2));
 
         thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
-        thread.taskManager().createTasks(assignedPartitions);
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(assignedPartitions);
+        Map<TopicPartition, Long> beginOffsets = new HashMap<>();
+        beginOffsets.put(t1p1, 0L);
+        beginOffsets.put(t1p2, 0L);
+        mockConsumer.updateBeginningOffsets(beginOffsets);
 
         thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
@@ -585,6 +605,9 @@ public class StreamThreadTest {
 
         thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
 
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(assignedPartitions);
+        mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
         thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
         thread.runOnce(-1);
@@ -649,6 +672,10 @@ public class StreamThreadTest {
         activeTasks.put(task1, Collections.singleton(t1p1));
 
         thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
+
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(assignedPartitions);
+        mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
         thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
         thread.runOnce(-1);
@@ -708,8 +735,10 @@ public class StreamThreadTest {
         activeTasks.put(task1, Collections.singleton(t1p1));
 
         thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
-        thread.taskManager().createTasks(assignedPartitions);
 
+        final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
+        mockConsumer.assign(assignedPartitions);
+        mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
         thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
         thread.runOnce(-1);
@@ -923,17 +952,18 @@ public class StreamThreadTest {
         thread.setState(StreamThread.State.RUNNING);
         thread.setState(StreamThread.State.PARTITIONS_REVOKED);
 
-        final Set<TopicPartition> assignedPartitions = Collections.singleton(new TopicPartition(t1p1.topic(), t1p1.partition()));
+        final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1);
         thread.taskManager().setAssignmentMetadata(
             Collections.singletonMap(
                 new TaskId(0, t1p1.partition()),
                 assignedPartitions),
             Collections.<TaskId, Set<TopicPartition>>emptyMap());
-        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
 
         final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
         mockConsumer.assign(Collections.singleton(t1p1));
         mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
+        thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+        thread.runOnce(-1);
 
         final MetricName skippedTotalMetric = metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", thread.getName()));
         assertEquals(0.0, metrics.metric(skippedTotalMetric).metricValue());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 648e9b0..e8ef7ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -117,7 +117,7 @@ public class TaskManagerTest {
     public final TemporaryFolder testFolder = new TemporaryFolder();
 
     @Before
-    public void setUp() throws Exception {
+    public void setUp() {
         taskManager = new TaskManager(changeLogReader,
                                       UUID.randomUUID(),
                                       "",
@@ -324,11 +324,9 @@ public class TaskManagerTest {
         verify(standby, standbyTaskCreator);
     }
 
-
     @Test
-    public void shouldPauseActiveUninitializedPartitions() {
+    public void shouldPauseActivePartitions() {
         mockSingleActiveTask();
-        EasyMock.expect(active.uninitializedPartitions()).andReturn(taskId0Partitions);
         consumer.pause(taskId0Partitions);
         EasyMock.expectLastCall();
         replay();
@@ -415,21 +413,17 @@ public class TaskManagerTest {
 
     @Test
     public void shouldInitializeNewActiveTasks() {
-        EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
-        EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
-                andReturn(Collections.<TopicPartition>emptySet());
+        active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject());
         EasyMock.expectLastCall();
         replay();
+
         taskManager.updateNewAndRestoringTasks();
         verify(active);
     }
 
     @Test
     public void shouldInitializeNewStandbyTasks() {
-        EasyMock.expect(standby.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
-        EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
-        EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
-                andReturn(Collections.<TopicPartition>emptySet());
+        active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject());
         EasyMock.expectLastCall();
         replay();
 
@@ -439,22 +433,21 @@ public class TaskManagerTest {
 
     @Test
     public void shouldRestoreStateFromChangeLogReader() {
-        EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
         EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
-        EasyMock.expect(active.updateRestored(taskId0Partitions)).
-                andReturn(Collections.<TopicPartition>emptySet());
-
+        active.updateRestored(taskId0Partitions);
+        EasyMock.expectLastCall();
         replay();
+
         taskManager.updateNewAndRestoringTasks();
         verify(changeLogReader, active);
     }
 
     @Test
     public void shouldResumeRestoredPartitions() {
-        EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
         EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
-        EasyMock.expect(active.updateRestored(taskId0Partitions)).
-                andReturn(taskId0Partitions);
+        EasyMock.expect(active.allTasksRunning()).andReturn(true);
+        EasyMock.expect(consumer.assignment()).andReturn(taskId0Partitions);
+        EasyMock.expect(standby.running()).andReturn(Collections.<StandbyTask>emptySet());
 
         consumer.resume(taskId0Partitions);
         EasyMock.expectLastCall();
@@ -475,10 +468,7 @@ public class TaskManagerTest {
 
     @Test
     public void shouldReturnFalseWhenThereAreStillNonRunningTasks() {
-        EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
         EasyMock.expect(active.allTasksRunning()).andReturn(false);
-        EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
-                andReturn(Collections.<TopicPartition>emptySet());
         replay();
 
         assertFalse(taskManager.updateNewAndRestoringTasks());
@@ -626,16 +616,13 @@ public class TaskManagerTest {
     }
 
     @Test
-    public void shouldResumeConsumptionOfInitializedPartitions() {
-        final Set<TopicPartition> resumed = Collections.singleton(new TopicPartition("topic", 0));
-        EasyMock.expect(active.initializeNewTasks()).andReturn(resumed);
-        EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
-                andReturn(Collections.<TopicPartition>emptySet());
-        consumer.resume(resumed);
-        EasyMock.expectLastCall();
-
+    public void shouldNotResumeConsumptionUntilAllStoresRestored() {
+        EasyMock.expect(active.allTasksRunning()).andReturn(false);
+        Consumer<byte[], byte[]> consumer = (Consumer<byte[], byte[]>) EasyMock.createStrictMock(Consumer.class);
+        taskManager.setConsumer(consumer);
         EasyMock.replay(active, consumer);
 
+        // shouldn't invoke `resume` method in consumer
         taskManager.updateNewAndRestoringTasks();
         EasyMock.verify(consumer);
     }
@@ -662,10 +649,7 @@ public class TaskManagerTest {
 
     private void mockAssignStandbyPartitions(final long offset) {
         final StandbyTask task = EasyMock.createNiceMock(StandbyTask.class);
-        EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
         EasyMock.expect(active.allTasksRunning()).andReturn(true);
-        EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
-                andReturn(Collections.<TopicPartition>emptySet());
         EasyMock.expect(standby.running()).andReturn(Collections.singletonList(task));
         EasyMock.expect(task.checkpointedOffsets()).andReturn(Collections.singletonMap(t1p0, offset));
         restoreConsumer.assign(taskId0Partitions);

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.