You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "cadonna (via GitHub)" <gi...@apache.org> on 2023/04/04 14:37:46 UTC
[GitHub] [kafka] cadonna commented on a diff in pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests
cadonna commented on code in PR #12607:
URL: https://github.com/apache/kafka/pull/12607#discussion_r1157144930
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -899,65 +841,59 @@ public void shouldSuspendRevokedTaskRemovedFromStateUpdater() {
when(stateUpdater.hasRemovedTasks()).thenReturn(true);
when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- replay(consumer);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- verify(consumer);
- Mockito.verify(statefulTask).suspend();
- Mockito.verify(tasks).addTask(statefulTask);
+ verify(statefulTask).suspend();
+ verify(tasks).addTask(statefulTask);
}
@Test
public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
final StreamTask taskToRecycle0 = statefulTask(taskId00, taskId00ChangelogPartitions)
- .inState(State.RESTORING)
- .withInputPartitions(taskId00Partitions).build();
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId00Partitions).build();
final StandbyTask taskToRecycle1 = standbyTask(taskId01, taskId01ChangelogPartitions)
- .inState(State.RUNNING)
- .withInputPartitions(taskId01Partitions).build();
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId01Partitions).build();
final StandbyTask convertedTask0 = standbyTask(taskId00, taskId00ChangelogPartitions).build();
final StreamTask convertedTask1 = statefulTask(taskId01, taskId01ChangelogPartitions).build();
final StreamTask taskToClose = statefulTask(taskId02, taskId02ChangelogPartitions)
- .inState(State.RESTORING)
- .withInputPartitions(taskId02Partitions).build();
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId02Partitions).build();
final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions)
- .inState(State.RESTORING)
- .withInputPartitions(taskId03Partitions).build();
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId03Partitions).build();
when(stateUpdater.hasRemovedTasks()).thenReturn(true);
when(stateUpdater.drainRemovedTasks())
- .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions));
+ .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions));
Review Comment:
nit: adding the indentation is not necessary.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -899,65 +841,59 @@ public void shouldSuspendRevokedTaskRemovedFromStateUpdater() {
when(stateUpdater.hasRemovedTasks()).thenReturn(true);
when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- replay(consumer);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- verify(consumer);
- Mockito.verify(statefulTask).suspend();
- Mockito.verify(tasks).addTask(statefulTask);
+ verify(statefulTask).suspend();
+ verify(tasks).addTask(statefulTask);
}
@Test
public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
final StreamTask taskToRecycle0 = statefulTask(taskId00, taskId00ChangelogPartitions)
- .inState(State.RESTORING)
- .withInputPartitions(taskId00Partitions).build();
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId00Partitions).build();
final StandbyTask taskToRecycle1 = standbyTask(taskId01, taskId01ChangelogPartitions)
- .inState(State.RUNNING)
- .withInputPartitions(taskId01Partitions).build();
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId01Partitions).build();
final StandbyTask convertedTask0 = standbyTask(taskId00, taskId00ChangelogPartitions).build();
final StreamTask convertedTask1 = statefulTask(taskId01, taskId01ChangelogPartitions).build();
final StreamTask taskToClose = statefulTask(taskId02, taskId02ChangelogPartitions)
- .inState(State.RESTORING)
- .withInputPartitions(taskId02Partitions).build();
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId02Partitions).build();
final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions)
- .inState(State.RESTORING)
- .withInputPartitions(taskId03Partitions).build();
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId03Partitions).build();
when(stateUpdater.hasRemovedTasks()).thenReturn(true);
when(stateUpdater.drainRemovedTasks())
- .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions));
+ .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions));
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
- expect(activeTaskCreator.createActiveTaskFromStandby(eq(taskToRecycle1), eq(taskId01Partitions), eq(consumer)))
- .andStubReturn(convertedTask1);
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
- expectLastCall().times(2);
- expect(standbyTaskCreator.createStandbyTaskFromActive(eq(taskToRecycle0), eq(taskId00Partitions)))
- .andStubReturn(convertedTask0);
- expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
- consumer.resume(anyObject());
- expectLastCall().anyTimes();
+ when(activeTaskCreator.createActiveTaskFromStandby(taskToRecycle1, taskId01Partitions, consumer))
+ .thenReturn(convertedTask1);
+ when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0, taskId00Partitions))
+ .thenReturn(convertedTask0);
+ when(consumer.assignment()).thenReturn(emptySet());
final TasksRegistry tasks = mock(TasksRegistry.class);
when(tasks.removePendingTaskToCloseClean(taskToClose.id())).thenReturn(true);
+ when(tasks.removePendingTaskToCloseClean(taskToUpdateInputPartitions.id())).thenReturn(false);
when(tasks.removePendingTaskToRecycle(taskToRecycle0.id())).thenReturn(taskId00Partitions);
when(tasks.removePendingTaskToRecycle(taskToRecycle1.id())).thenReturn(taskId01Partitions);
when(tasks.removePendingTaskToRecycle(
- argThat(taskId -> !taskId.equals(taskToRecycle0.id()) && !taskId.equals(taskToRecycle1.id())))
+ argThat(taskId -> !taskId.equals(taskToRecycle0.id()) && !taskId.equals(taskToRecycle1.id())))
Review Comment:
nit: adding indentation not necessary
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -899,65 +841,59 @@ public void shouldSuspendRevokedTaskRemovedFromStateUpdater() {
when(stateUpdater.hasRemovedTasks()).thenReturn(true);
when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(statefulTask));
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
- replay(consumer);
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
- verify(consumer);
- Mockito.verify(statefulTask).suspend();
- Mockito.verify(tasks).addTask(statefulTask);
+ verify(statefulTask).suspend();
+ verify(tasks).addTask(statefulTask);
}
@Test
public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
final StreamTask taskToRecycle0 = statefulTask(taskId00, taskId00ChangelogPartitions)
- .inState(State.RESTORING)
- .withInputPartitions(taskId00Partitions).build();
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId00Partitions).build();
final StandbyTask taskToRecycle1 = standbyTask(taskId01, taskId01ChangelogPartitions)
- .inState(State.RUNNING)
- .withInputPartitions(taskId01Partitions).build();
+ .inState(State.RUNNING)
+ .withInputPartitions(taskId01Partitions).build();
final StandbyTask convertedTask0 = standbyTask(taskId00, taskId00ChangelogPartitions).build();
final StreamTask convertedTask1 = statefulTask(taskId01, taskId01ChangelogPartitions).build();
final StreamTask taskToClose = statefulTask(taskId02, taskId02ChangelogPartitions)
- .inState(State.RESTORING)
- .withInputPartitions(taskId02Partitions).build();
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId02Partitions).build();
final StreamTask taskToUpdateInputPartitions = statefulTask(taskId03, taskId03ChangelogPartitions)
- .inState(State.RESTORING)
- .withInputPartitions(taskId03Partitions).build();
+ .inState(State.RESTORING)
+ .withInputPartitions(taskId03Partitions).build();
when(stateUpdater.hasRemovedTasks()).thenReturn(true);
when(stateUpdater.drainRemovedTasks())
- .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions));
+ .thenReturn(mkSet(taskToRecycle0, taskToRecycle1, taskToClose, taskToUpdateInputPartitions));
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
- expect(activeTaskCreator.createActiveTaskFromStandby(eq(taskToRecycle1), eq(taskId01Partitions), eq(consumer)))
- .andStubReturn(convertedTask1);
- activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
- expectLastCall().times(2);
- expect(standbyTaskCreator.createStandbyTaskFromActive(eq(taskToRecycle0), eq(taskId00Partitions)))
- .andStubReturn(convertedTask0);
- expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
- consumer.resume(anyObject());
- expectLastCall().anyTimes();
+ when(activeTaskCreator.createActiveTaskFromStandby(taskToRecycle1, taskId01Partitions, consumer))
+ .thenReturn(convertedTask1);
+ when(standbyTaskCreator.createStandbyTaskFromActive(taskToRecycle0, taskId00Partitions))
+ .thenReturn(convertedTask0);
Review Comment:
nit: we usually indent 4 spaces.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1119,17 +1051,15 @@ public void shouldRecycleRestoredTask() {
.inState(State.CREATED)
.withInputPartitions(taskId00Partitions).build();
final TaskManager taskManager = setUpRecycleRestoredTask(statefulTask);
- expect(standbyTaskCreator.createStandbyTaskFromActive(statefulTask, statefulTask.inputPartitions()))
- .andStubReturn(standbyTask);
+ when(standbyTaskCreator.createStandbyTaskFromActive(statefulTask, statefulTask.inputPartitions()))
+ .thenReturn(standbyTask);
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
Review Comment:
This needs to become verification
```java
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
```
in the verification block further down.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1193,15 +1119,13 @@ public void shouldCloseCleanRestoredTask() {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpCloseCleanRestoredTask(statefulTask, tasks);
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
Review Comment:
This needs to become verification
```java
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
```
in the verification block.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1060,14 +996,12 @@ public void shouldTransitRestoredTaskToRunning() {
final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTransitionToRunningOfRestoredTask(task, tasks);
consumer.resume(task.inputPartitions());
Review Comment:
This needs to become a verification
```java
verify(consumer).resume(task.inputPartitions());
```
further down in the verification block.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1292,15 +1210,13 @@ public void shouldUpdateInputPartitionsOfRestoredTask() {
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
consumer.resume(statefulTask.inputPartitions());
Review Comment:
This needs to become verification
```java
verify(consumer).resume(statefulTask.inputPartitions());
```
in the verification block.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1534,66 +1440,61 @@ public void shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
taskId10.toString(),
"dummy"
);
- replay(stateDirectory);
taskManager.handleRebalanceStart(singleton("topic"));
- verify(stateDirectory);
assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01)));
}
@Test
public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
- expect(consumer.assignment()).andReturn(assigned);
- consumer.pause(assigned);
- replay(consumer);
+ when(consumer.assignment()).thenReturn(assigned);
taskManager.handleRebalanceComplete();
- verify(consumer);
+ verify(consumer).pause(assigned);
}
@Test
public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
final StreamTask statefulTask0 = statefulTask(taskId00, taskId00ChangelogPartitions)
.inState(State.RUNNING)
.withInputPartitions(taskId00Partitions).build();
- final TasksRegistry tasks = Mockito.mock(TasksRegistry.class);
+ final TasksRegistry tasks = mock(TasksRegistry.class);
final TaskManager taskManager = setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks, true);
when(tasks.allTasks()).thenReturn(mkSet(statefulTask0));
final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
- expect(consumer.assignment()).andReturn(assigned);
- consumer.pause(mkSet(t1p1));
- replay(consumer);
+ when(consumer.assignment()).thenReturn(assigned);
taskManager.handleRebalanceComplete();
- verify(consumer);
+ verify(consumer).pause(mkSet(t1p1));
}
@Test
public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception {
expectLockObtainedFor(taskId00, taskId01, taskId02);
- expectUnlockFor(taskId02);
makeTaskFolders(
taskId00.toString(), // active task
taskId01.toString(), // standby task
taskId02.toString() // unassigned but able to lock
);
- replay(stateDirectory);
taskManager.handleRebalanceStart(singleton("topic"));
assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01, taskId02)));
handleAssignment(taskId00Assignment, taskId01Assignment, emptyMap());
- reset(consumer);
- expectConsumerAssignmentPaused(consumer);
- replay(consumer);
+
+ final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
+ when(consumer.assignment()).thenReturn(assignment);
taskManager.handleRebalanceComplete();
assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01)));
- verify(stateDirectory);
+ verify(consumer).pause(assignment);
+ verify(stateDirectory).unlock(taskId02);
+
+ verifyConsumerResumedWithAssignment(consumer);
Review Comment:
This method should also take the assignment as an argument. Maybe it is even better to call
```java
verify(consumer, atLeastOnce()).resume(assignment);
```
instead of `verifyConsumerResumedWithAssignment(consumer)` here.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1667,6 +1566,8 @@ public void shouldComputeOffsetSumForStandbyTask() throws Exception {
restoringTask.setChangelogOffsets(changelogOffsets);
assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
+
+ verifyConsumerResumedWithAssignment(consumer);
Review Comment:
Why did you add this verification? It was not there before the migration, right?
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1213,37 +1137,33 @@ public void shouldHandleExceptionThrownDuringCloseInCloseCleanRestoredTask() {
final TaskManager taskManager = setUpCloseCleanRestoredTask(statefulTask, tasks);
doThrow(RuntimeException.class).when(statefulTask).closeClean();
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
Review Comment:
This needs to become verification
```java
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
```
in the verification block.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1268,16 +1188,14 @@ public void shouldCloseDirtyRestoredTask() {
when(stateUpdater.restoresActiveTasks()).thenReturn(true);
activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
Review Comment:
This needs to become verification
```java
verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(statefulTask.id());
```
in the verification block.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4370,27 +4080,26 @@ private Map<TaskId, StateMachineTask> handleAssignment(final Map<TaskId, Set<Top
private void expectLockObtainedFor(final TaskId... tasks) throws Exception {
for (final TaskId task : tasks) {
- expect(stateDirectory.lock(task)).andReturn(true).once();
+ when(stateDirectory.lock(task)).thenReturn(true);
}
}
private void expectLockFailedFor(final TaskId... tasks) throws Exception {
for (final TaskId task : tasks) {
- expect(stateDirectory.lock(task)).andReturn(false).once();
+ when(stateDirectory.lock(task)).thenReturn(false);
}
}
private void expectUnlockFor(final TaskId... tasks) throws Exception {
for (final TaskId task : tasks) {
- stateDirectory.unlock(task);
- expectLastCall();
+ doNothing().when(stateDirectory).unlock(task);
}
}
private static void expectConsumerAssignmentPaused(final Consumer<byte[], byte[]> consumer) {
final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0));
- expect(consumer.assignment()).andReturn(assignment);
- consumer.pause(assignment);
+ when(consumer.assignment()).thenReturn(assignment);
+ doNothing().when(consumer).pause(assignment);
}
Review Comment:
This method is not needed anymore.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org