You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/04 12:00:50 UTC
[GitHub] [kafka] clolov opened a new pull request, #12821: Replace PowerMock and EasyMock with Mockito in streams tests
clolov opened a new pull request, #12821:
URL: https://github.com/apache/kafka/pull/12821
Batch 1 of the tests detailed in https://issues.apache.org/jira/browse/KAFKA-14132 which use PowerMock/EasyMock and need to be moved to Mockito.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
clolov commented on PR #12821:
URL: https://github.com/apache/kafka/pull/12821#issuecomment-1398172190
Thank you very much for the review and merge!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1046120121
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -42,77 +40,56 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class StateManagerUtilTest {
- @Mock(type = MockType.NICE)
+ @Mock
private ProcessorStateManager stateManager;
- @Mock(type = MockType.NICE)
+ @Mock
private StateDirectory stateDirectory;
- @Mock(type = MockType.NICE)
+ @Mock
private ProcessorTopology topology;
- @Mock(type = MockType.NICE)
+ @Mock
private InternalProcessorContext processorContext;
- private IMocksControl ctrl;
-
private Logger logger = new LogContext("test").logger(AbstractTask.class);
private final TaskId taskId = new TaskId(0, 0);
- @Before
- public void setup() {
- ctrl = createStrictControl();
- topology = ctrl.createMock(ProcessorTopology.class);
- processorContext = ctrl.createMock(InternalProcessorContext.class);
-
- stateManager = ctrl.createMock(ProcessorStateManager.class);
- stateDirectory = ctrl.createMock(StateDirectory.class);
- }
-
@Test
public void testRegisterStateStoreWhenTopologyEmpty() {
- expect(topology.stateStores()).andReturn(emptyList());
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(topology.stateStores()).thenReturn(emptyList());
StateManagerUtil.registerStateStores(logger,
"logPrefix:", topology, stateManager, stateDirectory, processorContext);
-
- ctrl.verify();
}
@Test
public void testRegisterStateStoreFailToLockStateDirectory() {
- expect(topology.stateStores()).andReturn(singletonList(new MockKeyValueStore("store", false)));
-
- expect(stateManager.taskId()).andReturn(taskId);
+ when(topology.stateStores()).thenReturn(singletonList(new MockKeyValueStore("store", false)));
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -42,77 +40,56 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class StateManagerUtilTest {
- @Mock(type = MockType.NICE)
+ @Mock
private ProcessorStateManager stateManager;
- @Mock(type = MockType.NICE)
+ @Mock
private StateDirectory stateDirectory;
- @Mock(type = MockType.NICE)
+ @Mock
private ProcessorTopology topology;
- @Mock(type = MockType.NICE)
+ @Mock
private InternalProcessorContext processorContext;
- private IMocksControl ctrl;
-
private Logger logger = new LogContext("test").logger(AbstractTask.class);
private final TaskId taskId = new TaskId(0, 0);
- @Before
- public void setup() {
- ctrl = createStrictControl();
- topology = ctrl.createMock(ProcessorTopology.class);
- processorContext = ctrl.createMock(InternalProcessorContext.class);
-
- stateManager = ctrl.createMock(ProcessorStateManager.class);
- stateDirectory = ctrl.createMock(StateDirectory.class);
- }
-
@Test
public void testRegisterStateStoreWhenTopologyEmpty() {
- expect(topology.stateStores()).andReturn(emptyList());
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(topology.stateStores()).thenReturn(emptyList());
StateManagerUtil.registerStateStores(logger,
"logPrefix:", topology, stateManager, stateDirectory, processorContext);
-
- ctrl.verify();
}
@Test
public void testRegisterStateStoreFailToLockStateDirectory() {
- expect(topology.stateStores()).andReturn(singletonList(new MockKeyValueStore("store", false)));
-
- expect(stateManager.taskId()).andReturn(taskId);
+ when(topology.stateStores()).thenReturn(singletonList(new MockKeyValueStore("store", false)));
- expect(stateDirectory.lock(taskId)).andReturn(false);
+ when(stateManager.taskId()).thenReturn(taskId);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
final List<StateStore> stateStores = Arrays.asList(store1, store2);
- expect(topology.stateStores()).andReturn(stateStores);
-
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
- expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
final List<StateStore> stateStores = Arrays.asList(store1, store2);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
final List<StateStore> stateStores = Arrays.asList(store1, store2);
- expect(topology.stateStores()).andReturn(stateStores);
-
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
- expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager);
- expect(topology.stateStores()).andReturn(stateStores);
+ when(topology.stateStores()).thenReturn(stateStores);
- stateManager.registerStateStores(stateStores, processorContext);
+ when(stateManager.taskId()).thenReturn(taskId);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
final List<StateStore> stateStores = Arrays.asList(store1, store2);
- expect(topology.stateStores()).andReturn(stateStores);
-
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
- expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager);
- expect(topology.stateStores()).andReturn(stateStores);
+ when(topology.stateStores()).thenReturn(stateStores);
- stateManager.registerStateStores(stateStores, processorContext);
+ when(stateManager.taskId()).thenReturn(taskId);
- stateManager.initializeStoreOffsetsFromCheckpoint(true);
- expectLastCall();
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(topology.stateStores()).thenReturn(stateStores);
StateManagerUtil.registerStateStores(logger, "logPrefix:",
topology, stateManager, stateDirectory, processorContext);
- ctrl.verify();
+ inOrder.verify(stateManager).registerStateStores(stateStores, processorContext);
+ inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+ verifyNoMoreInteractions(stateManager);
}
@Test
public void testCloseStateManagerClean() {
- expect(stateManager.taskId()).andReturn(taskId);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateManager.taskId()).thenReturn(taskId);
- stateManager.close();
- expectLastCall();
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateDirectory.lock(taskId)).thenReturn(true);
StateManagerUtil.closeStateManager(logger,
"logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenClean() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateManager.taskId()).thenReturn(taskId);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
final List<StateStore> stateStores = Arrays.asList(store1, store2);
- expect(topology.stateStores()).andReturn(stateStores);
-
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
- expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager);
- expect(topology.stateStores()).andReturn(stateStores);
+ when(topology.stateStores()).thenReturn(stateStores);
- stateManager.registerStateStores(stateStores, processorContext);
+ when(stateManager.taskId()).thenReturn(taskId);
- stateManager.initializeStoreOffsetsFromCheckpoint(true);
- expectLastCall();
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(topology.stateStores()).thenReturn(stateStores);
StateManagerUtil.registerStateStores(logger, "logPrefix:",
topology, stateManager, stateDirectory, processorContext);
- ctrl.verify();
+ inOrder.verify(stateManager).registerStateStores(stateStores, processorContext);
+ inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+ verifyNoMoreInteractions(stateManager);
}
@Test
public void testCloseStateManagerClean() {
- expect(stateManager.taskId()).andReturn(taskId);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateManager.taskId()).thenReturn(taskId);
- stateManager.close();
- expectLastCall();
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateDirectory.lock(taskId)).thenReturn(true);
StateManagerUtil.closeStateManager(logger,
"logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenClean() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateManager.taskId()).thenReturn(taskId);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
+ when(stateDirectory.lock(taskId)).thenReturn(true);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
- stateManager.close();
- expectLastCall();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state store.
- expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
- public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+ public void shouldStillWipeStateStoresIfCloseThrowsException() {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- expect(stateManager.baseDir()).andReturn(randomFile);
+ doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
- Utils.delete(randomFile);
+ when(stateManager.baseDir()).thenReturn(randomFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ assertThrows(ProcessorStateException.class, () ->
+ StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+ }
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- assertThrows(ProcessorStateException.class, () ->
- StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
- public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
+ public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
final File unknownFile = new File("/unknown/path");
- mockStatic(Utils.class);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
- stateManager.close();
- expectLastCall();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state store.
- expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
- public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+ public void shouldStillWipeStateStoresIfCloseThrowsException() {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- expect(stateManager.baseDir()).andReturn(randomFile);
+ doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
- stateManager.close();
- expectLastCall();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state store.
- expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
- public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+ public void shouldStillWipeStateStoresIfCloseThrowsException() {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- expect(stateManager.baseDir()).andReturn(randomFile);
+ doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
- Utils.delete(randomFile);
+ when(stateManager.baseDir()).thenReturn(randomFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ assertThrows(ProcessorStateException.class, () ->
+ StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+ }
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- assertThrows(ProcessorStateException.class, () ->
- StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
- public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
+ public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
final File unknownFile = new File("/unknown/path");
- mockStatic(Utils.class);
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
- stateManager.close();
- expectLastCall();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateManager.baseDir()).thenReturn(unknownFile);
- expect(stateManager.baseDir()).andReturn(unknownFile);
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
- Utils.delete(unknownFile);
- expectLastCall().andThrow(new IOException("Deletion failed"));
+ final ProcessorStateException thrown = assertThrows(
+ ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+ "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
- stateDirectory.unlock(taskId);
- expectLastCall();
+ assertEquals(IOException.class, thrown.getCause().getClass());
+ }
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- final ProcessorStateException thrown = assertThrows(
- ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
- "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- assertEquals(IOException.class, thrown.getCause().getClass());
-
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
- public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(false);
-
- stateManager.close();
- expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
+ public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +139,92 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state store.
- expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
- ctrl.verify();
+ verify(stateManager).close();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- expect(stateManager.baseDir()).andReturn(randomFile);
+ doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
- Utils.delete(randomFile);
+ when(stateManager.baseDir()).thenReturn(randomFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ assertThrows(ProcessorStateException.class, () ->
+ StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+ }
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- assertThrows(ProcessorStateException.class, () ->
- StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
final File unknownFile = new File("/unknown/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
- expect(stateManager.baseDir()).andReturn(unknownFile);
- Utils.delete(unknownFile);
- expectLastCall().andThrow(new IOException("Deletion failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateManager.baseDir()).thenReturn(unknownFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
- ctrl.checkOrder(true);
- ctrl.replay();
+ final ProcessorStateException thrown = assertThrows(
+ ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+ "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
- replayAll();
+ assertEquals(IOException.class, thrown.getCause().getClass());
+ }
- final ProcessorStateException thrown = assertThrows(
- ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
- "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- assertEquals(IOException.class, thrown.getCause().getClass());
-
- ctrl.verify();
+ verify(stateManager).close();
+ verify(stateDirectory).unlock(taskId);
}
@Test
- public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(false);
-
- stateManager.close();
- expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
+ public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
- logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
- }
-
- @Test
- public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws IOException {
Review Comment:
I see what you mean, but unit tests should also document the behavior of production code and be robust against code changes. In this case, we have indeed to separate cases:
1. When exactly-once is enabled and the state manager is closed dirty and the lock is not owned then the state manager should not be closed, the directory should not be wiped and the directory should not be unlocked.
2. When exactly-once is disabled and the lock is not owned then the state manager should not be closed and the directory should not be unlocked.
So, I think what you did is fine, although would not verify that `stateManager.baseDir()` is never called but that `Utils.delete()` is never called. However, you need to run that test once with:
```java
StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
```
and once with
```java
StateManagerUtil.closeStateManager(
logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
final List<StateStore> stateStores = Arrays.asList(store1, store2);
- expect(topology.stateStores()).andReturn(stateStores);
-
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
- expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager);
- expect(topology.stateStores()).andReturn(stateStores);
+ when(topology.stateStores()).thenReturn(stateStores);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
final List<StateStore> stateStores = Arrays.asList(store1, store2);
- expect(topology.stateStores()).andReturn(stateStores);
-
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
- expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager);
- expect(topology.stateStores()).andReturn(stateStores);
+ when(topology.stateStores()).thenReturn(stateStores);
- stateManager.registerStateStores(stateStores, processorContext);
+ when(stateManager.taskId()).thenReturn(taskId);
- stateManager.initializeStoreOffsetsFromCheckpoint(true);
- expectLastCall();
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
final List<StateStore> stateStores = Arrays.asList(store1, store2);
- expect(topology.stateStores()).andReturn(stateStores);
-
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
- expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager);
- expect(topology.stateStores()).andReturn(stateStores);
+ when(topology.stateStores()).thenReturn(stateStores);
- stateManager.registerStateStores(stateStores, processorContext);
+ when(stateManager.taskId()).thenReturn(taskId);
- stateManager.initializeStoreOffsetsFromCheckpoint(true);
- expectLastCall();
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(topology.stateStores()).thenReturn(stateStores);
StateManagerUtil.registerStateStores(logger, "logPrefix:",
topology, stateManager, stateDirectory, processorContext);
- ctrl.verify();
+ inOrder.verify(stateManager).registerStateStores(stateStores, processorContext);
+ inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+ verifyNoMoreInteractions(stateManager);
}
@Test
public void testCloseStateManagerClean() {
- expect(stateManager.taskId()).andReturn(taskId);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
- stateManager.close();
- expectLastCall();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state store.
- expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
- public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+ public void shouldStillWipeStateStoresIfCloseThrowsException() {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
- stateManager.close();
- expectLastCall();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
final List<StateStore> stateStores = Arrays.asList(store1, store2);
- expect(topology.stateStores()).andReturn(stateStores);
-
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
- expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager);
- expect(topology.stateStores()).andReturn(stateStores);
+ when(topology.stateStores()).thenReturn(stateStores);
- stateManager.registerStateStores(stateStores, processorContext);
+ when(stateManager.taskId()).thenReturn(taskId);
- stateManager.initializeStoreOffsetsFromCheckpoint(true);
- expectLastCall();
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(topology.stateStores()).thenReturn(stateStores);
StateManagerUtil.registerStateStores(logger, "logPrefix:",
topology, stateManager, stateDirectory, processorContext);
- ctrl.verify();
+ inOrder.verify(stateManager).registerStateStores(stateStores, processorContext);
+ inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+ verifyNoMoreInteractions(stateManager);
}
@Test
public void testCloseStateManagerClean() {
- expect(stateManager.taskId()).andReturn(taskId);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateManager.taskId()).thenReturn(taskId);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
- stateManager.close();
- expectLastCall();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state store.
- expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
- public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+ public void shouldStillWipeStateStoresIfCloseThrowsException() {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- expect(stateManager.baseDir()).andReturn(randomFile);
+ doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
- Utils.delete(randomFile);
+ when(stateManager.baseDir()).thenReturn(randomFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ assertThrows(ProcessorStateException.class, () ->
+ StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+ }
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- assertThrows(ProcessorStateException.class, () ->
- StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
- public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
+ public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
final File unknownFile = new File("/unknown/path");
- mockStatic(Utils.class);
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
Review Comment:
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
cadonna merged PR #12821:
URL: https://github.com/apache/kafka/pull/12821
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1014082612
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -42,77 +39,54 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class StateManagerUtilTest {
- @Mock(type = MockType.NICE)
+ @Mock
private ProcessorStateManager stateManager;
- @Mock(type = MockType.NICE)
+ @Mock
private StateDirectory stateDirectory;
- @Mock(type = MockType.NICE)
+ @Mock
private ProcessorTopology topology;
- @Mock(type = MockType.NICE)
+ @Mock
private InternalProcessorContext processorContext;
- private IMocksControl ctrl;
-
private Logger logger = new LogContext("test").logger(AbstractTask.class);
private final TaskId taskId = new TaskId(0, 0);
- @Before
- public void setup() {
- ctrl = createStrictControl();
- topology = ctrl.createMock(ProcessorTopology.class);
- processorContext = ctrl.createMock(InternalProcessorContext.class);
-
- stateManager = ctrl.createMock(ProcessorStateManager.class);
- stateDirectory = ctrl.createMock(StateDirectory.class);
- }
-
@Test
public void testRegisterStateStoreWhenTopologyEmpty() {
- expect(topology.stateStores()).andReturn(emptyList());
-
- ctrl.checkOrder(true);
Review Comment:
I do not think this adds much value so I removed it. If we want to ensure ordering this can be done in Mockito.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
divijvaidya commented on PR #12821:
URL: https://github.com/apache/kafka/pull/12821#issuecomment-1333861957
@clolov please fix the checkstyle failures in this PR!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
cadonna commented on PR #12821:
URL: https://github.com/apache/kafka/pull/12821#issuecomment-1397370205
Build failures are unrelated:
```
Build / JDK 11 and Scala 2.13 / org.apache.kafka.clients.consumer.KafkaConsumerTest.testReturnRecordsDuringRebalance()
Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInSinkTaskStart
Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testBrokerCoordinator
Build / JDK 17 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
Build / JDK 17 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1014084531
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +139,92 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state store.
- expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
- ctrl.verify();
+ verify(stateManager).close();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- expect(stateManager.baseDir()).andReturn(randomFile);
+ doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
- Utils.delete(randomFile);
+ when(stateManager.baseDir()).thenReturn(randomFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ assertThrows(ProcessorStateException.class, () ->
+ StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+ }
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- assertThrows(ProcessorStateException.class, () ->
- StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
final File unknownFile = new File("/unknown/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
- expect(stateManager.baseDir()).andReturn(unknownFile);
- Utils.delete(unknownFile);
- expectLastCall().andThrow(new IOException("Deletion failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateManager.baseDir()).thenReturn(unknownFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
- ctrl.checkOrder(true);
- ctrl.replay();
+ final ProcessorStateException thrown = assertThrows(
+ ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+ "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
- replayAll();
+ assertEquals(IOException.class, thrown.getCause().getClass());
+ }
- final ProcessorStateException thrown = assertThrows(
- ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
- "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- assertEquals(IOException.class, thrown.getCause().getClass());
-
- ctrl.verify();
+ verify(stateManager).close();
+ verify(stateDirectory).unlock(taskId);
}
@Test
- public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(false);
-
- stateManager.close();
- expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
+ public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
- logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
- }
-
- @Test
- public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws IOException {
Review Comment:
This test appears to be just wrong, unless I am reading the code incorrectly. Because we use `expect(stateDirectory.lock(taskId)).andReturn(false);` we never reach the code path which would be using `Utils.delete`. As such, I just removed the test.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
clolov commented on PR #12821:
URL: https://github.com/apache/kafka/pull/12821#issuecomment-1351857898
Hello @cadonna and thanks for the review! I will aim to address the comments tomorrow.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1081305288
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +137,107 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state store.
- expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
- public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+ public void shouldStillWipeStateStoresIfCloseThrowsException() {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
-
- expect(stateManager.baseDir()).andReturn(randomFile);
- Utils.delete(randomFile);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
+ when(stateManager.baseDir()).thenReturn(randomFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ assertThrows(ProcessorStateException.class, () ->
+ StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+ }
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- assertThrows(ProcessorStateException.class, () ->
- StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
- public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
+ public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
final File unknownFile = new File("/unknown/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
- expect(stateManager.baseDir()).andReturn(unknownFile);
-
- Utils.delete(unknownFile);
- expectLastCall().andThrow(new IOException("Deletion failed"));
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateManager.baseDir()).thenReturn(unknownFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
- ctrl.checkOrder(true);
- ctrl.replay();
+ final ProcessorStateException thrown = assertThrows(
+ ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+ "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
- replayAll();
+ assertEquals(IOException.class, thrown.getCause().getClass());
+ }
- final ProcessorStateException thrown = assertThrows(
- ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
- "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- assertEquals(IOException.class, thrown.getCause().getClass());
-
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(false);
-
- stateManager.close();
- expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
- logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
+ logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
+
+ inOrder.verify(stateManager, never()).close();
+ inOrder.verify(stateManager, never()).baseDir();
+ inOrder.verify(stateDirectory, never()).unlock(taskId);
Review Comment:
Okay, I pushed the latest rebased version!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1081170724
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +137,107 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state store.
- expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
- public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+ public void shouldStillWipeStateStoresIfCloseThrowsException() {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
-
- expect(stateManager.baseDir()).andReturn(randomFile);
- Utils.delete(randomFile);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
+ when(stateManager.baseDir()).thenReturn(randomFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ assertThrows(ProcessorStateException.class, () ->
+ StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+ }
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- assertThrows(ProcessorStateException.class, () ->
- StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
- public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
+ public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
final File unknownFile = new File("/unknown/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
- expect(stateManager.baseDir()).andReturn(unknownFile);
-
- Utils.delete(unknownFile);
- expectLastCall().andThrow(new IOException("Deletion failed"));
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateManager.baseDir()).thenReturn(unknownFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
- ctrl.checkOrder(true);
- ctrl.replay();
+ final ProcessorStateException thrown = assertThrows(
+ ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+ "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
- replayAll();
+ assertEquals(IOException.class, thrown.getCause().getClass());
+ }
- final ProcessorStateException thrown = assertThrows(
- ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
- "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- assertEquals(IOException.class, thrown.getCause().getClass());
-
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(false);
-
- stateManager.close();
- expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
- logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
+ logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
+
+ inOrder.verify(stateManager, never()).close();
+ inOrder.verify(stateManager, never()).baseDir();
+ inOrder.verify(stateDirectory, never()).unlock(taskId);
Review Comment:
Looks great!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
clolov commented on PR #12821:
URL: https://github.com/apache/kafka/pull/12821#issuecomment-1315197616
Thank you for the review @cadonna. I will aim to address it today!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1014079123
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java:
##########
@@ -47,22 +47,18 @@
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;
import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_1;
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.niceMock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({Cluster.class})
Review Comment:
I could not see how this affected the outcome of the tests so I removed it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1049809566
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +139,92 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state store.
- expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
- ctrl.verify();
+ verify(stateManager).close();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- expect(stateManager.baseDir()).andReturn(randomFile);
+ doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
- Utils.delete(randomFile);
+ when(stateManager.baseDir()).thenReturn(randomFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ assertThrows(ProcessorStateException.class, () ->
+ StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+ }
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- assertThrows(ProcessorStateException.class, () ->
- StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
final File unknownFile = new File("/unknown/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
- expect(stateManager.baseDir()).andReturn(unknownFile);
- Utils.delete(unknownFile);
- expectLastCall().andThrow(new IOException("Deletion failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateManager.baseDir()).thenReturn(unknownFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
- ctrl.checkOrder(true);
- ctrl.replay();
+ final ProcessorStateException thrown = assertThrows(
+ ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+ "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
- replayAll();
+ assertEquals(IOException.class, thrown.getCause().getClass());
+ }
- final ProcessorStateException thrown = assertThrows(
- ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
- "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- assertEquals(IOException.class, thrown.getCause().getClass());
-
- ctrl.verify();
+ verify(stateManager).close();
+ verify(stateDirectory).unlock(taskId);
}
@Test
- public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(false);
-
- stateManager.close();
- expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
+ public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
- logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
- }
-
- @Test
- public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws IOException {
Review Comment:
Okay, I agree, I will create a new commit testing both scenarios.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1024170827
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +139,92 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state store.
- expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
- ctrl.verify();
+ verify(stateManager).close();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- expect(stateManager.baseDir()).andReturn(randomFile);
+ doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
- Utils.delete(randomFile);
+ when(stateManager.baseDir()).thenReturn(randomFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ assertThrows(ProcessorStateException.class, () ->
+ StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+ }
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- assertThrows(ProcessorStateException.class, () ->
- StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
final File unknownFile = new File("/unknown/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
- expect(stateManager.baseDir()).andReturn(unknownFile);
- Utils.delete(unknownFile);
- expectLastCall().andThrow(new IOException("Deletion failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateManager.baseDir()).thenReturn(unknownFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
- ctrl.checkOrder(true);
- ctrl.replay();
+ final ProcessorStateException thrown = assertThrows(
+ ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+ "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
- replayAll();
+ assertEquals(IOException.class, thrown.getCause().getClass());
+ }
- final ProcessorStateException thrown = assertThrows(
- ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
- "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- assertEquals(IOException.class, thrown.getCause().getClass());
-
- ctrl.verify();
+ verify(stateManager).close();
+ verify(stateDirectory).unlock(taskId);
}
@Test
- public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(false);
-
- stateManager.close();
- expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
+ public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
- logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
- }
-
- @Test
- public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws IOException {
Review Comment:
As in, it appears a bit strange to mock everything on a path to the Utils.delete when this logic short-circuits if we use `expect(stateDirectory.lock(taskId)).andReturn(false);`.
```
try {
if (stateDirectory.lock(id)) { <---- WE JUMP FROM HERE...
try {
stateMgr.close(); <---- WE VERIFY THIS IS NOT CALLED
} catch (final ProcessorStateException e) {
firstException.compareAndSet(null, e);
} finally {
try {
if (wipeStateStore) {
log.debug("Wiping state stores for {} task {}", taskType, id);
// we can just delete the whole dir of the task, including the state store images and the checkpoint files,
// and then we write an empty checkpoint file indicating that the previous close is graceful and we just
// need to re-bootstrap the restoration from the beginning
Utils.delete(stateMgr.baseDir() <---- WE VERIFY THIS IS NOT CALLED);
}
} finally {
stateDirectory.unlock(id); <---- WE VERIFY THIS IS NOT CALLED
}
}
}
} catch (final IOException e) {
final ProcessorStateException exception = new ProcessorStateException(
String.format("%sFatal error while trying to close the state manager for task %s", logPrefix, id), e
);
firstException.compareAndSet(null, exception);
} <--- ...ALL THE WAY TO HERE
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1024174183
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -42,77 +39,54 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class StateManagerUtilTest {
- @Mock(type = MockType.NICE)
+ @Mock
private ProcessorStateManager stateManager;
- @Mock(type = MockType.NICE)
+ @Mock
private StateDirectory stateDirectory;
- @Mock(type = MockType.NICE)
+ @Mock
private ProcessorTopology topology;
- @Mock(type = MockType.NICE)
+ @Mock
private InternalProcessorContext processorContext;
- private IMocksControl ctrl;
-
private Logger logger = new LogContext("test").logger(AbstractTask.class);
private final TaskId taskId = new TaskId(0, 0);
- @Before
- public void setup() {
- ctrl = createStrictControl();
- topology = ctrl.createMock(ProcessorTopology.class);
- processorContext = ctrl.createMock(InternalProcessorContext.class);
-
- stateManager = ctrl.createMock(ProcessorStateManager.class);
- stateDirectory = ctrl.createMock(StateDirectory.class);
- }
-
@Test
public void testRegisterStateStoreWhenTopologyEmpty() {
- expect(topology.stateStores()).andReturn(emptyList());
-
- ctrl.checkOrder(true);
Review Comment:
I have hopefully addressed this in the next commit, but if I have missed or misunderstood something I will rework it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1021613702
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -42,77 +39,54 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class StateManagerUtilTest {
- @Mock(type = MockType.NICE)
+ @Mock
private ProcessorStateManager stateManager;
- @Mock(type = MockType.NICE)
+ @Mock
private StateDirectory stateDirectory;
- @Mock(type = MockType.NICE)
+ @Mock
private ProcessorTopology topology;
- @Mock(type = MockType.NICE)
+ @Mock
private InternalProcessorContext processorContext;
- private IMocksControl ctrl;
-
private Logger logger = new LogContext("test").logger(AbstractTask.class);
private final TaskId taskId = new TaskId(0, 0);
- @Before
- public void setup() {
- ctrl = createStrictControl();
- topology = ctrl.createMock(ProcessorTopology.class);
- processorContext = ctrl.createMock(InternalProcessorContext.class);
-
- stateManager = ctrl.createMock(ProcessorStateManager.class);
- stateDirectory = ctrl.createMock(StateDirectory.class);
- }
-
@Test
public void testRegisterStateStoreWhenTopologyEmpty() {
- expect(topology.stateStores()).andReturn(emptyList());
-
- ctrl.checkOrder(true);
Review Comment:
The original author wanted to make sure some calls are done in a specific order. You should migrate the tests to Mockito and preserve the verification of the intended order. I see that some test do not really verify an order. For those tests, you do not need to migrate the order verifications.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +139,92 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state store.
- expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
- ctrl.verify();
+ verify(stateManager).close();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- expect(stateManager.baseDir()).andReturn(randomFile);
+ doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
- Utils.delete(randomFile);
+ when(stateManager.baseDir()).thenReturn(randomFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ assertThrows(ProcessorStateException.class, () ->
+ StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+ }
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- assertThrows(ProcessorStateException.class, () ->
- StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
final File unknownFile = new File("/unknown/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
- expect(stateManager.baseDir()).andReturn(unknownFile);
- Utils.delete(unknownFile);
- expectLastCall().andThrow(new IOException("Deletion failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateManager.baseDir()).thenReturn(unknownFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
- ctrl.checkOrder(true);
- ctrl.replay();
+ final ProcessorStateException thrown = assertThrows(
+ ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+ "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
- replayAll();
+ assertEquals(IOException.class, thrown.getCause().getClass());
+ }
- final ProcessorStateException thrown = assertThrows(
- ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
- "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- assertEquals(IOException.class, thrown.getCause().getClass());
-
- ctrl.verify();
+ verify(stateManager).close();
+ verify(stateDirectory).unlock(taskId);
}
@Test
- public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(false);
-
- stateManager.close();
- expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
+ public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
- logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
- }
-
- @Test
- public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws IOException {
Review Comment:
As far as I understand the test, the author wanted to fail the test with an `AssertionError` if `Utils.delete` is called after the lock could not be acquired. With Mockito, you can verify that `Utils.delete` is never called.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1024166413
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +139,92 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state store.
- expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
- ctrl.verify();
+ verify(stateManager).close();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- expect(stateManager.baseDir()).andReturn(randomFile);
+ doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
- Utils.delete(randomFile);
+ when(stateManager.baseDir()).thenReturn(randomFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ assertThrows(ProcessorStateException.class, () ->
+ StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+ }
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- assertThrows(ProcessorStateException.class, () ->
- StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
final File unknownFile = new File("/unknown/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
- expect(stateManager.baseDir()).andReturn(unknownFile);
- Utils.delete(unknownFile);
- expectLastCall().andThrow(new IOException("Deletion failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateManager.baseDir()).thenReturn(unknownFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
- ctrl.checkOrder(true);
- ctrl.replay();
+ final ProcessorStateException thrown = assertThrows(
+ ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+ "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
- replayAll();
+ assertEquals(IOException.class, thrown.getCause().getClass());
+ }
- final ProcessorStateException thrown = assertThrows(
- ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
- "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- assertEquals(IOException.class, thrown.getCause().getClass());
-
- ctrl.verify();
+ verify(stateManager).close();
+ verify(stateDirectory).unlock(taskId);
}
@Test
- public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(false);
-
- stateManager.close();
- expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
+ public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
- logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
- }
-
- @Test
- public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws IOException {
Review Comment:
Sorry, I do not understand. I think this test and the one above can be collapsed in the test I have written. I still verify that deletion did not happen by checking that a few methods were never interacted with.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
clolov commented on PR #12821:
URL: https://github.com/apache/kafka/pull/12821#issuecomment-1303633189
Hey @cadonna 👋! Adding you for visibility as these are Streams-related tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1014080005
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java:
##########
@@ -151,22 +147,17 @@ public void shouldSetupRepartitionTopics() {
assertThat(repartitionTopics.topologiesWithMissingInputTopics().isEmpty(), is(true));
assertThat(repartitionTopics.missingSourceTopicExceptions().isEmpty(), is(true));
+
+ verify(copartitionedTopicsEnforcer).enforce(eq(coPartitionGroup1), any(), eq(clusterMetadata));
+ verify(copartitionedTopicsEnforcer).enforce(eq(coPartitionGroup2), any(), eq(clusterMetadata));
}
@Test
public void shouldReturnMissingSourceTopics() {
final Set<String> missingSourceTopics = mkSet(SOURCE_TOPIC_NAME1);
- expect(internalTopologyBuilder.subtopologyToTopicsInfo())
- .andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1), mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2)));
- expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList());
Review Comment:
Returning empty collections wherever it can is the default behaviour for Mockito, so a few of these have magically disappeared.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1014081545
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java:
##########
@@ -447,13 +414,9 @@ private void setupClusterWithMissingTopicsAndMissingPartitionCounts(final Set<St
SOME_OTHER_TOPIC
);
topics.removeAll(missingTopics);
- expect(clusterMetadata.topics()).andStubReturn(topics);
- expect(clusterMetadata.partitionCountForTopic(SOURCE_TOPIC_NAME1))
- .andStubReturn(topicsWithMissingPartitionCounts.contains(SOURCE_TOPIC_NAME1) ? null : 3);
- expect(clusterMetadata.partitionCountForTopic(SOURCE_TOPIC_NAME2))
Review Comment:
This and the expectation below were not exercised. I ran the tests with PowerMock/EasyMock and without these two expectations and they still passed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1024170827
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +139,92 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state store.
- expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
- ctrl.verify();
+ verify(stateManager).close();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- expect(stateManager.baseDir()).andReturn(randomFile);
+ doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
- Utils.delete(randomFile);
+ when(stateManager.baseDir()).thenReturn(randomFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ assertThrows(ProcessorStateException.class, () ->
+ StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+ }
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- assertThrows(ProcessorStateException.class, () ->
- StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
final File unknownFile = new File("/unknown/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
- expect(stateManager.baseDir()).andReturn(unknownFile);
- Utils.delete(unknownFile);
- expectLastCall().andThrow(new IOException("Deletion failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateManager.baseDir()).thenReturn(unknownFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
- ctrl.checkOrder(true);
- ctrl.replay();
+ final ProcessorStateException thrown = assertThrows(
+ ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+ "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
- replayAll();
+ assertEquals(IOException.class, thrown.getCause().getClass());
+ }
- final ProcessorStateException thrown = assertThrows(
- ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
- "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- assertEquals(IOException.class, thrown.getCause().getClass());
-
- ctrl.verify();
+ verify(stateManager).close();
+ verify(stateDirectory).unlock(taskId);
}
@Test
- public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(false);
-
- stateManager.close();
- expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
+ public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
- logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
- }
-
- @Test
- public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws IOException {
Review Comment:
As in, it appears a bit strange to mock everything on a path to the Utils.delete when this logic short-circuits if we use `expect(stateDirectory.lock(taskId)).andReturn(false);`.
```
try {
if (stateDirectory.lock(id)) { <---- WE JUMP FROM HERE...
try {
stateMgr.close(); <---- WE VERIFY THIS IS NOT CALLED
} catch (final ProcessorStateException e) {
firstException.compareAndSet(null, e);
} finally {
try {
if (wipeStateStore) {
log.debug("Wiping state stores for {} task {}", taskType, id);
// we can just delete the whole dir of the task, including the state store images and the checkpoint files,
// and then we write an empty checkpoint file indicating that the previous close is graceful and we just
// need to re-bootstrap the restoration from the beginning
Utils.delete(stateMgr.baseDir() <---- WE VERIFY THIS IS NOT CALLED);
}
} finally {
stateDirectory.unlock(id); <---- WE VERIFY THIS IS NOT CALLED
}
}
}
} catch (final IOException e) {
final ProcessorStateException exception = new ProcessorStateException(
String.format("%sFatal error while trying to close the state manager for task %s", logPrefix, id), e
);
firstException.compareAndSet(null, exception);
} <---- ...ALL THE WAY TO HERE
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1080946697
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +137,107 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state store.
- expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
- public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+ public void shouldStillWipeStateStoresIfCloseThrowsException() {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
-
- expect(stateManager.baseDir()).andReturn(randomFile);
- Utils.delete(randomFile);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
+ when(stateManager.baseDir()).thenReturn(randomFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ assertThrows(ProcessorStateException.class, () ->
+ StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+ }
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- assertThrows(ProcessorStateException.class, () ->
- StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
- public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
+ public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
final File unknownFile = new File("/unknown/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
- expect(stateManager.baseDir()).andReturn(unknownFile);
-
- Utils.delete(unknownFile);
- expectLastCall().andThrow(new IOException("Deletion failed"));
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateManager.baseDir()).thenReturn(unknownFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
- ctrl.checkOrder(true);
- ctrl.replay();
+ final ProcessorStateException thrown = assertThrows(
+ ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+ "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
- replayAll();
+ assertEquals(IOException.class, thrown.getCause().getClass());
+ }
- final ProcessorStateException thrown = assertThrows(
- ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
- "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- assertEquals(IOException.class, thrown.getCause().getClass());
-
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(false);
-
- stateManager.close();
- expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
- logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
+ logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
+
+ inOrder.verify(stateManager, never()).close();
+ inOrder.verify(stateManager, never()).baseDir();
+ inOrder.verify(stateDirectory, never()).unlock(taskId);
Review Comment:
I do not know how much it makes sense to verify the call order of methods that should not be called.
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +137,107 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state store.
- expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
- public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+ public void shouldStillWipeStateStoresIfCloseThrowsException() {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
-
- expect(stateManager.baseDir()).andReturn(randomFile);
- Utils.delete(randomFile);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
+ when(stateManager.baseDir()).thenReturn(randomFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ assertThrows(ProcessorStateException.class, () ->
+ StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+ }
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- assertThrows(ProcessorStateException.class, () ->
- StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
- public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
+ public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
final File unknownFile = new File("/unknown/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
- expect(stateManager.baseDir()).andReturn(unknownFile);
-
- Utils.delete(unknownFile);
- expectLastCall().andThrow(new IOException("Deletion failed"));
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateManager.baseDir()).thenReturn(unknownFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
- ctrl.checkOrder(true);
- ctrl.replay();
+ final ProcessorStateException thrown = assertThrows(
+ ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+ "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
- replayAll();
+ assertEquals(IOException.class, thrown.getCause().getClass());
+ }
- final ProcessorStateException thrown = assertThrows(
- ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
- "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- assertEquals(IOException.class, thrown.getCause().getClass());
-
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(false);
-
- stateManager.close();
- expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
- logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
+ logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
+
+ inOrder.verify(stateManager, never()).close();
+ inOrder.verify(stateManager, never()).baseDir();
+ inOrder.verify(stateDirectory, never()).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
- public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws IOException {
- final File unknownFile = new File("/unknown/path");
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(false);
-
- expect(stateManager.baseDir()).andReturn(unknownFile);
-
- Utils.delete(unknownFile);
- expectLastCall().andThrow(new AssertionError("Should not be trying to wipe state you don't own!"));
-
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
+ public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
- logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
+ logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
+
+ inOrder.verify(stateManager, never()).close();
+ inOrder.verify(stateManager, never()).baseDir();
+ inOrder.verify(stateDirectory, never()).unlock(taskId);
Review Comment:
See my comment above about call order.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests
Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1081139070
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +137,107 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state store.
- expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
- public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+ public void shouldStillWipeStateStoresIfCloseThrowsException() {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
-
- expect(stateManager.baseDir()).andReturn(randomFile);
- Utils.delete(randomFile);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
+ when(stateManager.baseDir()).thenReturn(randomFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ assertThrows(ProcessorStateException.class, () ->
+ StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+ }
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- assertThrows(ProcessorStateException.class, () ->
- StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
- public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
+ public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
final File unknownFile = new File("/unknown/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
- expect(stateManager.baseDir()).andReturn(unknownFile);
-
- Utils.delete(unknownFile);
- expectLastCall().andThrow(new IOException("Deletion failed"));
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateManager.baseDir()).thenReturn(unknownFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
- ctrl.checkOrder(true);
- ctrl.replay();
+ final ProcessorStateException thrown = assertThrows(
+ ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+ "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
- replayAll();
+ assertEquals(IOException.class, thrown.getCause().getClass());
+ }
- final ProcessorStateException thrown = assertThrows(
- ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
- "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- assertEquals(IOException.class, thrown.getCause().getClass());
-
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(false);
-
- stateManager.close();
- expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
- logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
+ logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
+
+ inOrder.verify(stateManager, never()).close();
+ inOrder.verify(stateManager, never()).baseDir();
+ inOrder.verify(stateDirectory, never()).unlock(taskId);
Review Comment:
This is a fair point. I used `inOrder` because the original test wanted to be strict. This being said it wanted to be strict on the actual calls rather than the calls which weren't made. Does
```
@Test
public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
final InOrder inOrder = inOrder(stateManager, stateDirectory);
when(stateManager.taskId()).thenReturn(taskId);
when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
inOrder.verify(stateManager).taskId(); <- ENSURE ORDER
inOrder.verify(stateDirectory).lock(taskId); <- ENSURE ORDER
verify(stateManager, never()).close();
verify(stateManager, never()).baseDir();
verify(stateDirectory, never()).unlock(taskId);
verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
final InOrder inOrder = inOrder(stateManager, stateDirectory);
when(stateManager.taskId()).thenReturn(taskId);
when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
inOrder.verify(stateManager).taskId(); <- ENSURE ORDER
inOrder.verify(stateDirectory).lock(taskId); <- ENSURE ORDER
verify(stateManager, never()).close();
verify(stateManager, never()).baseDir();
verify(stateDirectory, never()).unlock(taskId);
verifyNoMoreInteractions(stateManager, stateDirectory);
}
```
look better to you?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org