You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2018/03/15 20:02:31 UTC
[kafka] branch trunk updated: KAFKA-6106: Postpone normal
processing of tasks within a thread until restoration of all tasks have
completed. (#4651)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a6fad27 KAFKA-6106: Postpone normal processing of tasks within a thread until restoration of all tasks have completed. (#4651)
a6fad27 is described below
commit a6fad27372f9a931540c2e4345e428b643535d46
Author: Kamal C <ka...@gmail.com>
AuthorDate: Fri Mar 16 01:32:28 2018 +0530
KAFKA-6106: Postpone normal processing of tasks within a thread until restoration of all tasks have completed. (#4651)
Author: Kamal Chandraprakash <ka...@gmail.com>
Reviewer: Matthias J. Sax <ma...@confluent.io>, Bill Bejeck <bi...@confluent.io>
---
.../streams/processor/internals/AssignedTasks.java | 35 ++++------------
.../streams/processor/internals/StreamThread.java | 4 +-
.../streams/processor/internals/TaskManager.java | 17 ++++----
.../internals/AssignedStreamsTasksTest.java | 45 +++-----------------
.../processor/internals/StreamThreadTest.java | 42 ++++++++++++++++---
.../processor/internals/TaskManagerTest.java | 48 ++++++++--------------
6 files changed, 74 insertions(+), 117 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index c806bfd..9204571 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -73,27 +73,12 @@ abstract class AssignedTasks<T extends Task> {
created.put(task.id(), task);
}
- Set<TopicPartition> uninitializedPartitions() {
- if (created.isEmpty()) {
- return Collections.emptySet();
- }
- final Set<TopicPartition> partitions = new HashSet<>();
- for (final Map.Entry<TaskId, T> entry : created.entrySet()) {
- if (entry.getValue().hasStateStores()) {
- partitions.addAll(entry.getValue().partitions());
- }
- }
- return partitions;
- }
-
/**
- * @return partitions that are ready to be resumed
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
- Set<TopicPartition> initializeNewTasks() {
- final Set<TopicPartition> readyPartitions = new HashSet<>();
+ void initializeNewTasks() {
if (!created.isEmpty()) {
log.debug("Initializing {}s {}", taskTypeName, created.keySet());
}
@@ -104,7 +89,7 @@ abstract class AssignedTasks<T extends Task> {
log.debug("Transitioning {} {} to restoring", taskTypeName, entry.getKey());
addToRestoring(entry.getValue());
} else {
- transitionToRunning(entry.getValue(), readyPartitions);
+ transitionToRunning(entry.getValue());
}
it.remove();
} catch (final LockException e) {
@@ -112,21 +97,19 @@ abstract class AssignedTasks<T extends Task> {
log.trace("Could not create {} {} due to {}; will retry", taskTypeName, entry.getKey(), e.getMessage());
}
}
- return readyPartitions;
}
- Set<TopicPartition> updateRestored(final Collection<TopicPartition> restored) {
+ void updateRestored(final Collection<TopicPartition> restored) {
if (restored.isEmpty()) {
- return Collections.emptySet();
+ return;
}
log.trace("{} changelog partitions that have completed restoring so far: {}", taskTypeName, restored);
- final Set<TopicPartition> resume = new HashSet<>();
restoredPartitions.addAll(restored);
for (final Iterator<Map.Entry<TaskId, T>> it = restoring.entrySet().iterator(); it.hasNext(); ) {
final Map.Entry<TaskId, T> entry = it.next();
final T task = entry.getValue();
if (restoredPartitions.containsAll(task.changelogPartitions())) {
- transitionToRunning(task, resume);
+ transitionToRunning(task);
it.remove();
log.trace("{} {} completed restoration as all its changelog partitions {} have been applied to restore state",
taskTypeName,
@@ -146,7 +129,6 @@ abstract class AssignedTasks<T extends Task> {
if (allTasksRunning()) {
restoredPartitions.clear();
}
- return resume;
}
boolean allTasksRunning() {
@@ -243,7 +225,7 @@ abstract class AssignedTasks<T extends Task> {
suspended.remove(taskId);
task.resume();
try {
- transitionToRunning(task, new HashSet<TopicPartition>());
+ transitionToRunning(task);
} catch (final TaskMigratedException e) {
// we need to catch migration exception internally since this function
// is triggered in the rebalance callback
@@ -278,15 +260,12 @@ abstract class AssignedTasks<T extends Task> {
/**
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
- private void transitionToRunning(final T task, final Set<TopicPartition> readyPartitions) {
+ private void transitionToRunning(final T task) {
log.debug("transitioning {} {} to running", taskTypeName, task.id());
running.put(task.id(), task);
task.initializeTopology();
for (TopicPartition topicPartition : task.partitions()) {
runningByPartition.put(topicPartition, task);
- if (task.hasStateStores()) {
- readyPartitions.add(topicPartition);
- }
}
for (TopicPartition topicPartition : task.changelogPartitions()) {
runningByPartition.put(topicPartition, task);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 9bbc0da..02a4bb9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -899,7 +899,7 @@ public class StreamThread extends Thread {
final StreamTask task = taskManager.activeTask(partition);
if (task.isClosed()) {
- log.warn("Stream task {} is already closed, probably because it got unexpectly migrated to another thread already. " +
+ log.warn("Stream task {} is already closed, probably because it got unexpectedly migrated to another thread already. " +
"Notifying the thread to trigger a new rebalance immediately.", task.id());
throw new TaskMigratedException(task);
}
@@ -1065,7 +1065,7 @@ public class StreamThread extends Thread {
}
if (task.isClosed()) {
- log.warn("Standby task {} is already closed, probably because it got unexpectly migrated to another thread already. " +
+ log.warn("Standby task {} is already closed, probably because it got unexpectedly migrated to another thread already. " +
"Notifying the thread to trigger a new rebalance immediately.", task.id());
throw new TaskMigratedException(task);
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 80df517..6308ca7 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -106,9 +106,9 @@ class TaskManager {
active.closeNonAssignedSuspendedTasks(assignedActiveTasks);
addStreamTasks(assignment);
addStandbyTasks();
- final Set<TopicPartition> partitions = active.uninitializedPartitions();
- log.trace("Pausing partitions: {}", partitions);
- consumer.pause(partitions);
+ // Pause all the partitions until the underlying state store is ready for all the active tasks.
+ log.trace("Pausing partitions: {}", assignment);
+ consumer.pause(assignment);
}
private void addStreamTasks(final Collection<TopicPartition> assignment) {
@@ -312,18 +312,17 @@ class TaskManager {
* @throws TaskMigratedException if the task producer got fenced or consumer discovered changelog offset changes (EOS only)
*/
boolean updateNewAndRestoringTasks() {
- final Set<TopicPartition> resumed = active.initializeNewTasks();
+ active.initializeNewTasks();
standby.initializeNewTasks();
final Collection<TopicPartition> restored = changelogReader.restore(active);
- resumed.addAll(active.updateRestored(restored));
+ active.updateRestored(restored);
- if (!resumed.isEmpty()) {
- log.trace("Resuming partitions {}", resumed);
- consumer.resume(resumed);
- }
if (active.allTasksRunning()) {
+ Set<TopicPartition> assignment = consumer.assignment();
+ log.trace("Resuming partitions {}", assignment);
+ consumer.resume(assignment);
assignStandbyPartitions();
return true;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
index 246d047..fcd2322 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
@@ -58,36 +58,6 @@ public class AssignedStreamsTasksTest {
}
@Test
- public void shouldGetPartitionsFromNewTasksThatHaveStateStores() {
- EasyMock.expect(t1.hasStateStores()).andReturn(true);
- EasyMock.expect(t2.hasStateStores()).andReturn(true);
- EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
- EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
- EasyMock.replay(t1, t2);
-
- assignedTasks.addNewTask(t1);
- assignedTasks.addNewTask(t2);
-
- final Set<TopicPartition> partitions = assignedTasks.uninitializedPartitions();
- assertThat(partitions, equalTo(Utils.mkSet(tp1, tp2)));
- EasyMock.verify(t1, t2);
- }
-
- @Test
- public void shouldNotGetPartitionsFromNewTasksWithoutStateStores() {
- EasyMock.expect(t1.hasStateStores()).andReturn(false);
- EasyMock.expect(t2.hasStateStores()).andReturn(false);
- EasyMock.replay(t1, t2);
-
- assignedTasks.addNewTask(t1);
- assignedTasks.addNewTask(t2);
-
- final Set<TopicPartition> partitions = assignedTasks.uninitializedPartitions();
- assertTrue(partitions.isEmpty());
- EasyMock.verify(t1, t2);
- }
-
- @Test
public void shouldInitializeNewTasks() {
EasyMock.expect(t1.initializeStateStores()).andReturn(false);
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
@@ -112,19 +82,17 @@ public class AssignedStreamsTasksTest {
final Set<TopicPartition> t2partitions = Collections.singleton(tp2);
EasyMock.expect(t2.partitions()).andReturn(t2partitions);
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
- EasyMock.expect(t2.hasStateStores()).andReturn(true);
EasyMock.replay(t1, t2);
assignedTasks.addNewTask(t1);
assignedTasks.addNewTask(t2);
- final Set<TopicPartition> readyPartitions = assignedTasks.initializeNewTasks();
+ assignedTasks.initializeNewTasks();
Collection<StreamTask> restoring = assignedTasks.restoringTasks();
assertThat(restoring.size(), equalTo(1));
assertSame(restoring.iterator().next(), t1);
- assertThat(readyPartitions, equalTo(t2partitions));
}
@Test
@@ -134,15 +102,13 @@ public class AssignedStreamsTasksTest {
EasyMock.expectLastCall().once();
EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
- EasyMock.expect(t2.hasStateStores()).andReturn(false);
EasyMock.replay(t2);
assignedTasks.addNewTask(t2);
- final Set<TopicPartition> toResume = assignedTasks.initializeNewTasks();
+ assignedTasks.initializeNewTasks();
assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId2)));
- assertThat(toResume, equalTo(Collections.<TopicPartition>emptySet()));
}
@Test
@@ -158,9 +124,9 @@ public class AssignedStreamsTasksTest {
addAndInitTask();
- assertTrue(assignedTasks.updateRestored(Utils.mkSet(changeLog1)).isEmpty());
- Set<TopicPartition> partitions = assignedTasks.updateRestored(Utils.mkSet(changeLog2));
- assertThat(partitions, equalTo(task1Partitions));
+ assignedTasks.updateRestored(Utils.mkSet(changeLog1));
+ assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.<TaskId>emptySet()));
+ assignedTasks.updateRestored(Utils.mkSet(changeLog2));
assertThat(assignedTasks.runningTaskIds(), equalTo(Collections.singleton(taskId1)));
}
@@ -282,7 +248,6 @@ public class AssignedStreamsTasksTest {
EasyMock.expectLastCall().once();
EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
- EasyMock.expect(t1.hasStateStores()).andReturn(false);
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 482b764..b22d98e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -158,6 +158,10 @@ public class StreamThreadTest {
// assign single partition
assignedPartitions = Collections.singletonList(t1p1);
thread.taskManager().setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), Collections.<TaskId, Set<TopicPartition>>emptyMap());
+
+ final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
+ mockConsumer.assign(assignedPartitions);
+ mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
rebalanceListener.onPartitionsAssigned(assignedPartitions);
thread.runOnce(-1);
assertEquals(thread.state(), StreamThread.State.RUNNING);
@@ -378,8 +382,13 @@ public class StreamThreadTest {
activeTasks.put(task2, Collections.singleton(t1p2));
thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
- thread.taskManager().createTasks(assignedPartitions);
+ final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
+ mockConsumer.assign(assignedPartitions);
+ Map<TopicPartition, Long> beginOffsets = new HashMap<>();
+ beginOffsets.put(t1p1, 0L);
+ beginOffsets.put(t1p2, 0L);
+ mockConsumer.updateBeginningOffsets(beginOffsets);
thread.rebalanceListener.onPartitionsAssigned(new HashSet<>(assignedPartitions));
assertEquals(1, clientSupplier.producers.size());
@@ -411,6 +420,12 @@ public class StreamThreadTest {
thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
+ final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
+ mockConsumer.assign(assignedPartitions);
+ Map<TopicPartition, Long> beginOffsets = new HashMap<>();
+ beginOffsets.put(t1p1, 0L);
+ beginOffsets.put(t1p2, 0L);
+ mockConsumer.updateBeginningOffsets(beginOffsets);
thread.rebalanceListener.onPartitionsAssigned(new HashSet<>(assignedPartitions));
thread.runOnce(-1);
@@ -439,7 +454,12 @@ public class StreamThreadTest {
activeTasks.put(task2, Collections.singleton(t1p2));
thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
- thread.taskManager().createTasks(assignedPartitions);
+ final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
+ mockConsumer.assign(assignedPartitions);
+ Map<TopicPartition, Long> beginOffsets = new HashMap<>();
+ beginOffsets.put(t1p1, 0L);
+ beginOffsets.put(t1p2, 0L);
+ mockConsumer.updateBeginningOffsets(beginOffsets);
thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
@@ -595,6 +615,9 @@ public class StreamThreadTest {
thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
+ final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
+ mockConsumer.assign(assignedPartitions);
+ mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
thread.runOnce(-1);
@@ -659,6 +682,10 @@ public class StreamThreadTest {
activeTasks.put(task1, Collections.singleton(t1p1));
thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
+
+ final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
+ mockConsumer.assign(assignedPartitions);
+ mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
thread.runOnce(-1);
@@ -714,8 +741,10 @@ public class StreamThreadTest {
activeTasks.put(task1, Collections.singleton(t1p1));
thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
- thread.taskManager().createTasks(assignedPartitions);
+ final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
+ mockConsumer.assign(assignedPartitions);
+ mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
thread.runOnce(-1);
@@ -883,9 +912,9 @@ public class StreamThreadTest {
thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap());
- thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
clientSupplier.consumer.assign(assignedPartitions);
clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
+ thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
thread.runOnce(-1);
@@ -1074,17 +1103,18 @@ public class StreamThreadTest {
thread.setState(StreamThread.State.RUNNING);
thread.setState(StreamThread.State.PARTITIONS_REVOKED);
- final Set<TopicPartition> assignedPartitions = Collections.singleton(new TopicPartition(t1p1.topic(), t1p1.partition()));
+ final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1);
thread.taskManager().setAssignmentMetadata(
Collections.singletonMap(
new TaskId(0, t1p1.partition()),
assignedPartitions),
Collections.<TaskId, Set<TopicPartition>>emptyMap());
- thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.consumer;
mockConsumer.assign(Collections.singleton(t1p1));
mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L));
+ thread.rebalanceListener.onPartitionsAssigned(assignedPartitions);
+ thread.runOnce(-1);
final MetricName skippedTotalMetric = metrics.metricName("skipped-records-total", "stream-metrics", Collections.singletonMap("client-id", thread.getName()));
assertEquals(0.0, metrics.metric(skippedTotalMetric).metricValue());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 648e9b0..e8ef7ea 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -117,7 +117,7 @@ public class TaskManagerTest {
public final TemporaryFolder testFolder = new TemporaryFolder();
@Before
- public void setUp() throws Exception {
+ public void setUp() {
taskManager = new TaskManager(changeLogReader,
UUID.randomUUID(),
"",
@@ -324,11 +324,9 @@ public class TaskManagerTest {
verify(standby, standbyTaskCreator);
}
-
@Test
- public void shouldPauseActiveUninitializedPartitions() {
+ public void shouldPauseActivePartitions() {
mockSingleActiveTask();
- EasyMock.expect(active.uninitializedPartitions()).andReturn(taskId0Partitions);
consumer.pause(taskId0Partitions);
EasyMock.expectLastCall();
replay();
@@ -415,21 +413,17 @@ public class TaskManagerTest {
@Test
public void shouldInitializeNewActiveTasks() {
- EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
- EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
- andReturn(Collections.<TopicPartition>emptySet());
+ active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject());
EasyMock.expectLastCall();
replay();
+
taskManager.updateNewAndRestoringTasks();
verify(active);
}
@Test
public void shouldInitializeNewStandbyTasks() {
- EasyMock.expect(standby.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
- EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
- EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
- andReturn(Collections.<TopicPartition>emptySet());
+ active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject());
EasyMock.expectLastCall();
replay();
@@ -439,22 +433,21 @@ public class TaskManagerTest {
@Test
public void shouldRestoreStateFromChangeLogReader() {
- EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
- EasyMock.expect(active.updateRestored(taskId0Partitions)).
- andReturn(Collections.<TopicPartition>emptySet());
-
+ active.updateRestored(taskId0Partitions);
+ EasyMock.expectLastCall();
replay();
+
taskManager.updateNewAndRestoringTasks();
verify(changeLogReader, active);
}
@Test
public void shouldResumeRestoredPartitions() {
- EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions);
- EasyMock.expect(active.updateRestored(taskId0Partitions)).
- andReturn(taskId0Partitions);
+ EasyMock.expect(active.allTasksRunning()).andReturn(true);
+ EasyMock.expect(consumer.assignment()).andReturn(taskId0Partitions);
+ EasyMock.expect(standby.running()).andReturn(Collections.<StandbyTask>emptySet());
consumer.resume(taskId0Partitions);
EasyMock.expectLastCall();
@@ -475,10 +468,7 @@ public class TaskManagerTest {
@Test
public void shouldReturnFalseWhenThereAreStillNonRunningTasks() {
- EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
EasyMock.expect(active.allTasksRunning()).andReturn(false);
- EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
- andReturn(Collections.<TopicPartition>emptySet());
replay();
assertFalse(taskManager.updateNewAndRestoringTasks());
@@ -626,16 +616,13 @@ public class TaskManagerTest {
}
@Test
- public void shouldResumeConsumptionOfInitializedPartitions() {
- final Set<TopicPartition> resumed = Collections.singleton(new TopicPartition("topic", 0));
- EasyMock.expect(active.initializeNewTasks()).andReturn(resumed);
- EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
- andReturn(Collections.<TopicPartition>emptySet());
- consumer.resume(resumed);
- EasyMock.expectLastCall();
-
+ public void shouldNotResumeConsumptionUntilAllStoresRestored() {
+ EasyMock.expect(active.allTasksRunning()).andReturn(false);
+ Consumer<byte[], byte[]> consumer = (Consumer<byte[], byte[]>) EasyMock.createStrictMock(Consumer.class);
+ taskManager.setConsumer(consumer);
EasyMock.replay(active, consumer);
+ // shouldn't invoke `resume` method in consumer
taskManager.updateNewAndRestoringTasks();
EasyMock.verify(consumer);
}
@@ -662,10 +649,7 @@ public class TaskManagerTest {
private void mockAssignStandbyPartitions(final long offset) {
final StandbyTask task = EasyMock.createNiceMock(StandbyTask.class);
- EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>());
EasyMock.expect(active.allTasksRunning()).andReturn(true);
- EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())).
- andReturn(Collections.<TopicPartition>emptySet());
EasyMock.expect(standby.running()).andReturn(Collections.singletonList(task));
EasyMock.expect(task.checkpointedOffsets()).andReturn(Collections.singletonMap(t1p0, offset));
restoreConsumer.assign(taskId0Partitions);
--
To stop receiving notification emails like this one, please contact
mjsax@apache.org.