You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/04 12:00:50 UTC

[GitHub] [kafka] clolov opened a new pull request, #12821: Replace PowerMock and EasyMock with Mockito in streams tests

clolov opened a new pull request, #12821:
URL: https://github.com/apache/kafka/pull/12821

   Batch 1 of the tests detailed in https://issues.apache.org/jira/browse/KAFKA-14132 which use PowerMock/EasyMock and need to be moved to Mockito.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on PR #12821:
URL: https://github.com/apache/kafka/pull/12821#issuecomment-1398172190

   Thank you very much for the review and merge!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1046120121


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -42,77 +40,56 @@
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class StateManagerUtilTest {
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private ProcessorStateManager stateManager;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private StateDirectory stateDirectory;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private ProcessorTopology topology;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private InternalProcessorContext processorContext;
 
-    private IMocksControl ctrl;
-
     private Logger logger = new LogContext("test").logger(AbstractTask.class);
 
     private final TaskId taskId = new TaskId(0, 0);
 
-    @Before
-    public void setup() {
-        ctrl = createStrictControl();
-        topology = ctrl.createMock(ProcessorTopology.class);
-        processorContext = ctrl.createMock(InternalProcessorContext.class);
-
-        stateManager = ctrl.createMock(ProcessorStateManager.class);
-        stateDirectory = ctrl.createMock(StateDirectory.class);
-    }
-
     @Test
     public void testRegisterStateStoreWhenTopologyEmpty() {
-        expect(topology.stateStores()).andReturn(emptyList());
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(topology.stateStores()).thenReturn(emptyList());
 
         StateManagerUtil.registerStateStores(logger,
             "logPrefix:", topology, stateManager, stateDirectory, processorContext);
-
-        ctrl.verify();
     }
 
     @Test
     public void testRegisterStateStoreFailToLockStateDirectory() {
-        expect(topology.stateStores()).andReturn(singletonList(new MockKeyValueStore("store", false)));
-
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(topology.stateStores()).thenReturn(singletonList(new MockKeyValueStore("store", false)));
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -42,77 +40,56 @@
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class StateManagerUtilTest {
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private ProcessorStateManager stateManager;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private StateDirectory stateDirectory;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private ProcessorTopology topology;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private InternalProcessorContext processorContext;
 
-    private IMocksControl ctrl;
-
     private Logger logger = new LogContext("test").logger(AbstractTask.class);
 
     private final TaskId taskId = new TaskId(0, 0);
 
-    @Before
-    public void setup() {
-        ctrl = createStrictControl();
-        topology = ctrl.createMock(ProcessorTopology.class);
-        processorContext = ctrl.createMock(InternalProcessorContext.class);
-
-        stateManager = ctrl.createMock(ProcessorStateManager.class);
-        stateDirectory = ctrl.createMock(StateDirectory.class);
-    }
-
     @Test
     public void testRegisterStateStoreWhenTopologyEmpty() {
-        expect(topology.stateStores()).andReturn(emptyList());
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(topology.stateStores()).thenReturn(emptyList());
 
         StateManagerUtil.registerStateStores(logger,
             "logPrefix:", topology, stateManager, stateDirectory, processorContext);
-
-        ctrl.verify();
     }
 
     @Test
     public void testRegisterStateStoreFailToLockStateDirectory() {
-        expect(topology.stateStores()).andReturn(singletonList(new MockKeyValueStore("store", false)));
-
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(topology.stateStores()).thenReturn(singletonList(new MockKeyValueStore("store", false)));
 
-        expect(stateDirectory.lock(taskId)).andReturn(false);
+        when(stateManager.taskId()).thenReturn(taskId);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 
-        expect(topology.stateStores()).andReturn(stateStores);
+        when(topology.stateStores()).thenReturn(stateStores);
 
-        stateManager.registerStateStores(stateStores, processorContext);
+        when(stateManager.taskId()).thenReturn(taskId);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 
-        expect(topology.stateStores()).andReturn(stateStores);
+        when(topology.stateStores()).thenReturn(stateStores);
 
-        stateManager.registerStateStores(stateStores, processorContext);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.initializeStoreOffsetsFromCheckpoint(true);
-        expectLastCall();
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(topology.stateStores()).thenReturn(stateStores);
 
         StateManagerUtil.registerStateStores(logger, "logPrefix:",
             topology, stateManager, stateDirectory, processorContext);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).registerStateStores(stateStores, processorContext);
+        inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+        verifyNoMoreInteractions(stateManager);
     }
 
     @Test
     public void testCloseStateManagerClean() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.close();
-        expectLastCall();
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenClean() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateManager.taskId()).thenReturn(taskId);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 
-        expect(topology.stateStores()).andReturn(stateStores);
+        when(topology.stateStores()).thenReturn(stateStores);
 
-        stateManager.registerStateStores(stateStores, processorContext);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.initializeStoreOffsetsFromCheckpoint(true);
-        expectLastCall();
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(topology.stateStores()).thenReturn(stateStores);
 
         StateManagerUtil.registerStateStores(logger, "logPrefix:",
             topology, stateManager, stateDirectory, processorContext);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).registerStateStores(stateStores, processorContext);
+        inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+        verifyNoMoreInteractions(stateManager);
     }
 
     @Test
     public void testCloseStateManagerClean() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.close();
-        expectLastCall();
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenClean() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+    public void  shouldStillWipeStateStoresIfCloseThrowsException() {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
 
-        Utils.delete(randomFile);
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
+    public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+    public void  shouldStillWipeStateStoresIfCloseThrowsException() {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+    public void  shouldStillWipeStateStoresIfCloseThrowsException() {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
 
-        Utils.delete(randomFile);
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
+    public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
 
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateManager.baseDir()).thenReturn(unknownFile);
 
-        expect(stateManager.baseDir()).andReturn(unknownFile);
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
 
-        Utils.delete(unknownFile);
-        expectLastCall().andThrow(new IOException("Deletion failed"));
+            final ProcessorStateException thrown = assertThrows(
+                    ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+                            "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+            assertEquals(IOException.class, thrown.getCause().getClass());
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        final ProcessorStateException thrown = assertThrows(
-            ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
-                "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        assertEquals(IOException.class, thrown.getCause().getClass());
-
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(false);
-
-        stateManager.close();
-        expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
+    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +139,92 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        verify(stateManager).close();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
 
-        Utils.delete(randomFile);
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
-        expect(stateManager.baseDir()).andReturn(unknownFile);
 
-        Utils.delete(unknownFile);
-        expectLastCall().andThrow(new IOException("Deletion failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateManager.baseDir()).thenReturn(unknownFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+            final ProcessorStateException thrown = assertThrows(
+                    ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+                            "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        replayAll();
+            assertEquals(IOException.class, thrown.getCause().getClass());
+        }
 
-        final ProcessorStateException thrown = assertThrows(
-            ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
-                "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        assertEquals(IOException.class, thrown.getCause().getClass());
-
-        ctrl.verify();
+        verify(stateManager).close();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(false);
-
-        stateManager.close();
-        expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
+    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(false);
 
         StateManagerUtil.closeStateManager(
-            logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
-    }
-
-    @Test
-    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws IOException {

Review Comment:
   I see what you mean, but unit tests should also document the behavior of production code and be robust against code changes. In this case, we have indeed to separate cases:
   
   1. When exactly-once is enabled and the state manager is closed dirty and the lock is not owned then the state manager should not be closed, the directory should not be wiped and the directory should not be unlocked.
   2. When exactly-once is disabled and the lock is not owned then the state manager should not be closed and the directory should not be unlocked.
   
   So, I think what you did is fine, although would not verify that `stateManager.baseDir()` is never called but that `Utils.delete()` is never called. However, you need to run that test once with:
   
   ```java
   StateManagerUtil.closeStateManager(
               logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
   ``` 
   
   and once with
   
   ```java
   StateManagerUtil.closeStateManager(
               logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
   ``` 
   



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 
-        expect(topology.stateStores()).andReturn(stateStores);
+        when(topology.stateStores()).thenReturn(stateStores);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 
-        expect(topology.stateStores()).andReturn(stateStores);
+        when(topology.stateStores()).thenReturn(stateStores);
 
-        stateManager.registerStateStores(stateStores, processorContext);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.initializeStoreOffsetsFromCheckpoint(true);
-        expectLastCall();
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 
-        expect(topology.stateStores()).andReturn(stateStores);
+        when(topology.stateStores()).thenReturn(stateStores);
 
-        stateManager.registerStateStores(stateStores, processorContext);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.initializeStoreOffsetsFromCheckpoint(true);
-        expectLastCall();
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(topology.stateStores()).thenReturn(stateStores);
 
         StateManagerUtil.registerStateStores(logger, "logPrefix:",
             topology, stateManager, stateDirectory, processorContext);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).registerStateStores(stateStores, processorContext);
+        inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+        verifyNoMoreInteractions(stateManager);
     }
 
     @Test
     public void testCloseStateManagerClean() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+    public void  shouldStillWipeStateStoresIfCloseThrowsException() {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 
-        expect(topology.stateStores()).andReturn(stateStores);
+        when(topology.stateStores()).thenReturn(stateStores);
 
-        stateManager.registerStateStores(stateStores, processorContext);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.initializeStoreOffsetsFromCheckpoint(true);
-        expectLastCall();
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(topology.stateStores()).thenReturn(stateStores);
 
         StateManagerUtil.registerStateStores(logger, "logPrefix:",
             topology, stateManager, stateDirectory, processorContext);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).registerStateStores(stateStores, processorContext);
+        inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+        verifyNoMoreInteractions(stateManager);
     }
 
     @Test
     public void testCloseStateManagerClean() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateManager.taskId()).thenReturn(taskId);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+    public void  shouldStillWipeStateStoresIfCloseThrowsException() {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
 
-        Utils.delete(randomFile);
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
+    public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
 
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna merged pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
cadonna merged PR #12821:
URL: https://github.com/apache/kafka/pull/12821


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1014082612


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -42,77 +39,54 @@
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class StateManagerUtilTest {
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private ProcessorStateManager stateManager;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private StateDirectory stateDirectory;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private ProcessorTopology topology;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private InternalProcessorContext processorContext;
 
-    private IMocksControl ctrl;
-
     private Logger logger = new LogContext("test").logger(AbstractTask.class);
 
     private final TaskId taskId = new TaskId(0, 0);
 
-    @Before
-    public void setup() {
-        ctrl = createStrictControl();
-        topology = ctrl.createMock(ProcessorTopology.class);
-        processorContext = ctrl.createMock(InternalProcessorContext.class);
-
-        stateManager = ctrl.createMock(ProcessorStateManager.class);
-        stateDirectory = ctrl.createMock(StateDirectory.class);
-    }
-
     @Test
     public void testRegisterStateStoreWhenTopologyEmpty() {
-        expect(topology.stateStores()).andReturn(emptyList());
-
-        ctrl.checkOrder(true);

Review Comment:
   I do not think this adds much value so I removed it. If we want to ensure ordering this can be done in Mockito.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] divijvaidya commented on pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
divijvaidya commented on PR #12821:
URL: https://github.com/apache/kafka/pull/12821#issuecomment-1333861957

   @clolov please fix the checkstyle failures in this PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
cadonna commented on PR #12821:
URL: https://github.com/apache/kafka/pull/12821#issuecomment-1397370205

   Build failures are unrelated:
   ```
   Build / JDK 11 and Scala 2.13 / org.apache.kafka.clients.consumer.KafkaConsumerTest.testReturnRecordsDuringRebalance()
   Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInSinkTaskStart
   Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testBrokerCoordinator
   Build / JDK 17 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   Build / JDK 17 and Scala 2.13 / org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1014084531


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +139,92 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        verify(stateManager).close();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
 
-        Utils.delete(randomFile);
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
-        expect(stateManager.baseDir()).andReturn(unknownFile);
 
-        Utils.delete(unknownFile);
-        expectLastCall().andThrow(new IOException("Deletion failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateManager.baseDir()).thenReturn(unknownFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+            final ProcessorStateException thrown = assertThrows(
+                    ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+                            "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        replayAll();
+            assertEquals(IOException.class, thrown.getCause().getClass());
+        }
 
-        final ProcessorStateException thrown = assertThrows(
-            ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
-                "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        assertEquals(IOException.class, thrown.getCause().getClass());
-
-        ctrl.verify();
+        verify(stateManager).close();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(false);
-
-        stateManager.close();
-        expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
+    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(false);
 
         StateManagerUtil.closeStateManager(
-            logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
-    }
-
-    @Test
-    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws IOException {

Review Comment:
   This test appears to be just wrong, unless I am reading the code incorrectly. Because we use `expect(stateDirectory.lock(taskId)).andReturn(false);` we never reach the code path which would be using `Utils.delete`. As such, I just removed the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on PR #12821:
URL: https://github.com/apache/kafka/pull/12821#issuecomment-1351857898

   Hello @cadonna and thanks for the review! I will aim to address the comments tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1081305288


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +137,107 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+    public void  shouldStillWipeStateStoresIfCloseThrowsException() {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
-
-        expect(stateManager.baseDir()).andReturn(randomFile);
 
-        Utils.delete(randomFile);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
+    public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
-        expect(stateManager.baseDir()).andReturn(unknownFile);
-
-        Utils.delete(unknownFile);
-        expectLastCall().andThrow(new IOException("Deletion failed"));
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateManager.baseDir()).thenReturn(unknownFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+            final ProcessorStateException thrown = assertThrows(
+                    ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+                            "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        replayAll();
+            assertEquals(IOException.class, thrown.getCause().getClass());
+        }
 
-        final ProcessorStateException thrown = assertThrows(
-            ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
-                "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        assertEquals(IOException.class, thrown.getCause().getClass());
-
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
     public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(false);
-
-        stateManager.close();
-        expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(false);
 
         StateManagerUtil.closeStateManager(
-            logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
+                logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
+
+        inOrder.verify(stateManager, never()).close();
+        inOrder.verify(stateManager, never()).baseDir();
+        inOrder.verify(stateDirectory, never()).unlock(taskId);

Review Comment:
   Okay, I pushed the latest rebased version!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1081170724


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +137,107 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+    public void  shouldStillWipeStateStoresIfCloseThrowsException() {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
-
-        expect(stateManager.baseDir()).andReturn(randomFile);
 
-        Utils.delete(randomFile);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
+    public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
-        expect(stateManager.baseDir()).andReturn(unknownFile);
-
-        Utils.delete(unknownFile);
-        expectLastCall().andThrow(new IOException("Deletion failed"));
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateManager.baseDir()).thenReturn(unknownFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+            final ProcessorStateException thrown = assertThrows(
+                    ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+                            "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        replayAll();
+            assertEquals(IOException.class, thrown.getCause().getClass());
+        }
 
-        final ProcessorStateException thrown = assertThrows(
-            ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
-                "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        assertEquals(IOException.class, thrown.getCause().getClass());
-
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
     public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(false);
-
-        stateManager.close();
-        expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(false);
 
         StateManagerUtil.closeStateManager(
-            logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
+                logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
+
+        inOrder.verify(stateManager, never()).close();
+        inOrder.verify(stateManager, never()).baseDir();
+        inOrder.verify(stateDirectory, never()).unlock(taskId);

Review Comment:
   Looks great!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on PR #12821:
URL: https://github.com/apache/kafka/pull/12821#issuecomment-1315197616

   Thank you for the review @cadonna. I will aim to address it today!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1014079123


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java:
##########
@@ -47,22 +47,18 @@
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;
 import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_1;
 
-import static org.easymock.EasyMock.anyObject;
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.mock;
-import static org.easymock.EasyMock.niceMock;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({Cluster.class})

Review Comment:
   I could not see how this affected the outcome of the tests so I removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1049809566


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +139,92 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        verify(stateManager).close();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
 
-        Utils.delete(randomFile);
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
-        expect(stateManager.baseDir()).andReturn(unknownFile);
 
-        Utils.delete(unknownFile);
-        expectLastCall().andThrow(new IOException("Deletion failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateManager.baseDir()).thenReturn(unknownFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+            final ProcessorStateException thrown = assertThrows(
+                    ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+                            "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        replayAll();
+            assertEquals(IOException.class, thrown.getCause().getClass());
+        }
 
-        final ProcessorStateException thrown = assertThrows(
-            ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
-                "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        assertEquals(IOException.class, thrown.getCause().getClass());
-
-        ctrl.verify();
+        verify(stateManager).close();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(false);
-
-        stateManager.close();
-        expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
+    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(false);
 
         StateManagerUtil.closeStateManager(
-            logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
-    }
-
-    @Test
-    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws IOException {

Review Comment:
   Okay, I agree, I will create a new commit testing both scenarios.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1024170827


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +139,92 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        verify(stateManager).close();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
 
-        Utils.delete(randomFile);
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
-        expect(stateManager.baseDir()).andReturn(unknownFile);
 
-        Utils.delete(unknownFile);
-        expectLastCall().andThrow(new IOException("Deletion failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateManager.baseDir()).thenReturn(unknownFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+            final ProcessorStateException thrown = assertThrows(
+                    ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+                            "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        replayAll();
+            assertEquals(IOException.class, thrown.getCause().getClass());
+        }
 
-        final ProcessorStateException thrown = assertThrows(
-            ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
-                "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        assertEquals(IOException.class, thrown.getCause().getClass());
-
-        ctrl.verify();
+        verify(stateManager).close();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(false);
-
-        stateManager.close();
-        expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
+    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(false);
 
         StateManagerUtil.closeStateManager(
-            logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
-    }
-
-    @Test
-    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws IOException {

Review Comment:
   As in, it appears a bit strange to mock everything on a path to the Utils.delete when this logic short-circuits if we use `expect(stateDirectory.lock(taskId)).andReturn(false);`.
   ```
           try {
               if (stateDirectory.lock(id)) { <---- WE JUMP FROM HERE...
                   try {
                       stateMgr.close(); <---- WE VERIFY THIS IS NOT CALLED
                   } catch (final ProcessorStateException e) {
                       firstException.compareAndSet(null, e);
                   } finally {
                       try {
                           if (wipeStateStore) {
                               log.debug("Wiping state stores for {} task {}", taskType, id);
                               // we can just delete the whole dir of the task, including the state store images and the checkpoint files,
                               // and then we write an empty checkpoint file indicating that the previous close is graceful and we just
                               // need to re-bootstrap the restoration from the beginning
                               Utils.delete(stateMgr.baseDir() <---- WE VERIFY THIS IS NOT CALLED);
                           }
                       } finally {
                           stateDirectory.unlock(id); <---- WE VERIFY THIS IS NOT CALLED
                       }
                   }
               }
           } catch (final IOException e) {
               final ProcessorStateException exception = new ProcessorStateException(
                   String.format("%sFatal error while trying to close the state manager for task %s", logPrefix, id), e
               );
               firstException.compareAndSet(null, exception);
           } <--- ...ALL THE WAY TO HERE
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1024174183


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -42,77 +39,54 @@
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class StateManagerUtilTest {
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private ProcessorStateManager stateManager;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private StateDirectory stateDirectory;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private ProcessorTopology topology;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private InternalProcessorContext processorContext;
 
-    private IMocksControl ctrl;
-
     private Logger logger = new LogContext("test").logger(AbstractTask.class);
 
     private final TaskId taskId = new TaskId(0, 0);
 
-    @Before
-    public void setup() {
-        ctrl = createStrictControl();
-        topology = ctrl.createMock(ProcessorTopology.class);
-        processorContext = ctrl.createMock(InternalProcessorContext.class);
-
-        stateManager = ctrl.createMock(ProcessorStateManager.class);
-        stateDirectory = ctrl.createMock(StateDirectory.class);
-    }
-
     @Test
     public void testRegisterStateStoreWhenTopologyEmpty() {
-        expect(topology.stateStores()).andReturn(emptyList());
-
-        ctrl.checkOrder(true);

Review Comment:
   I have hopefully addressed this in the next commit, but if I have missed or misunderstood something I will rework it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1021613702


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -42,77 +39,54 @@
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class StateManagerUtilTest {
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private ProcessorStateManager stateManager;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private StateDirectory stateDirectory;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private ProcessorTopology topology;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private InternalProcessorContext processorContext;
 
-    private IMocksControl ctrl;
-
     private Logger logger = new LogContext("test").logger(AbstractTask.class);
 
     private final TaskId taskId = new TaskId(0, 0);
 
-    @Before
-    public void setup() {
-        ctrl = createStrictControl();
-        topology = ctrl.createMock(ProcessorTopology.class);
-        processorContext = ctrl.createMock(InternalProcessorContext.class);
-
-        stateManager = ctrl.createMock(ProcessorStateManager.class);
-        stateDirectory = ctrl.createMock(StateDirectory.class);
-    }
-
     @Test
     public void testRegisterStateStoreWhenTopologyEmpty() {
-        expect(topology.stateStores()).andReturn(emptyList());
-
-        ctrl.checkOrder(true);

Review Comment:
   The original author wanted to make sure some calls are done in a specific order. You should migrate the tests to Mockito and preserve the verification of the intended order. I see that some test do not really verify an order. For those tests, you do not need to migrate the order verifications. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +139,92 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        verify(stateManager).close();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
 
-        Utils.delete(randomFile);
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
-        expect(stateManager.baseDir()).andReturn(unknownFile);
 
-        Utils.delete(unknownFile);
-        expectLastCall().andThrow(new IOException("Deletion failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateManager.baseDir()).thenReturn(unknownFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+            final ProcessorStateException thrown = assertThrows(
+                    ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+                            "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        replayAll();
+            assertEquals(IOException.class, thrown.getCause().getClass());
+        }
 
-        final ProcessorStateException thrown = assertThrows(
-            ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
-                "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        assertEquals(IOException.class, thrown.getCause().getClass());
-
-        ctrl.verify();
+        verify(stateManager).close();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(false);
-
-        stateManager.close();
-        expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
+    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(false);
 
         StateManagerUtil.closeStateManager(
-            logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
-    }
-
-    @Test
-    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws IOException {

Review Comment:
   As far as I understand the test, the author wanted to fail the test with an `AssertionError` if `Utils.delete` is called after the lock could not be acquired. With Mockito, you can verify that `Utils.delete` is never called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1024166413


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +139,92 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        verify(stateManager).close();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
 
-        Utils.delete(randomFile);
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
-        expect(stateManager.baseDir()).andReturn(unknownFile);
 
-        Utils.delete(unknownFile);
-        expectLastCall().andThrow(new IOException("Deletion failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateManager.baseDir()).thenReturn(unknownFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+            final ProcessorStateException thrown = assertThrows(
+                    ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+                            "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        replayAll();
+            assertEquals(IOException.class, thrown.getCause().getClass());
+        }
 
-        final ProcessorStateException thrown = assertThrows(
-            ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
-                "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        assertEquals(IOException.class, thrown.getCause().getClass());
-
-        ctrl.verify();
+        verify(stateManager).close();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(false);
-
-        stateManager.close();
-        expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
+    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(false);
 
         StateManagerUtil.closeStateManager(
-            logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
-    }
-
-    @Test
-    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws IOException {

Review Comment:
   Sorry, I do not understand. I think this test and the one above can be collapsed in the test I have written. I still verify that deletion did not happen by checking that a few methods were never interacted with.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on PR #12821:
URL: https://github.com/apache/kafka/pull/12821#issuecomment-1303633189

   Hey @cadonna 👋! Adding you for visibility as these are Streams-related tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1014080005


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java:
##########
@@ -151,22 +147,17 @@ public void shouldSetupRepartitionTopics() {
 
         assertThat(repartitionTopics.topologiesWithMissingInputTopics().isEmpty(), is(true));
         assertThat(repartitionTopics.missingSourceTopicExceptions().isEmpty(), is(true));
+
+        verify(copartitionedTopicsEnforcer).enforce(eq(coPartitionGroup1), any(), eq(clusterMetadata));
+        verify(copartitionedTopicsEnforcer).enforce(eq(coPartitionGroup2), any(), eq(clusterMetadata));
     }
 
     @Test
     public void shouldReturnMissingSourceTopics() {
         final Set<String> missingSourceTopics = mkSet(SOURCE_TOPIC_NAME1);
-        expect(internalTopologyBuilder.subtopologyToTopicsInfo())
-            .andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1), mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2)));
-        expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList());

Review Comment:
   Returning empty collections wherever it can is the default behaviour for Mockito, so a few of these have magically disappeared.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1014081545


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java:
##########
@@ -447,13 +414,9 @@ private void setupClusterWithMissingTopicsAndMissingPartitionCounts(final Set<St
             SOME_OTHER_TOPIC
         );
         topics.removeAll(missingTopics);
-        expect(clusterMetadata.topics()).andStubReturn(topics);
-        expect(clusterMetadata.partitionCountForTopic(SOURCE_TOPIC_NAME1))
-            .andStubReturn(topicsWithMissingPartitionCounts.contains(SOURCE_TOPIC_NAME1) ? null : 3);
-        expect(clusterMetadata.partitionCountForTopic(SOURCE_TOPIC_NAME2))

Review Comment:
   This and the expectation below were not exercised. I ran the tests with PowerMock/EasyMock and without these two expectations and they still passed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1024170827


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +139,92 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        verify(stateManager).close();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
 
-        Utils.delete(randomFile);
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
-        expect(stateManager.baseDir()).andReturn(unknownFile);
 
-        Utils.delete(unknownFile);
-        expectLastCall().andThrow(new IOException("Deletion failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateManager.baseDir()).thenReturn(unknownFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+            final ProcessorStateException thrown = assertThrows(
+                    ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+                            "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        replayAll();
+            assertEquals(IOException.class, thrown.getCause().getClass());
+        }
 
-        final ProcessorStateException thrown = assertThrows(
-            ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
-                "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        assertEquals(IOException.class, thrown.getCause().getClass());
-
-        ctrl.verify();
+        verify(stateManager).close();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(false);
-
-        stateManager.close();
-        expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
+    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(false);
 
         StateManagerUtil.closeStateManager(
-            logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
-    }
-
-    @Test
-    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws IOException {

Review Comment:
   As in, it appears a bit strange to mock everything on a path to the Utils.delete when this logic short-circuits if we use `expect(stateDirectory.lock(taskId)).andReturn(false);`.
   ```
           try {
               if (stateDirectory.lock(id)) { <---- WE JUMP FROM HERE...
                   try {
                       stateMgr.close(); <---- WE VERIFY THIS IS NOT CALLED
                   } catch (final ProcessorStateException e) {
                       firstException.compareAndSet(null, e);
                   } finally {
                       try {
                           if (wipeStateStore) {
                               log.debug("Wiping state stores for {} task {}", taskType, id);
                               // we can just delete the whole dir of the task, including the state store images and the checkpoint files,
                               // and then we write an empty checkpoint file indicating that the previous close is graceful and we just
                               // need to re-bootstrap the restoration from the beginning
                               Utils.delete(stateMgr.baseDir() <---- WE VERIFY THIS IS NOT CALLED);
                           }
                       } finally {
                           stateDirectory.unlock(id); <---- WE VERIFY THIS IS NOT CALLED
                       }
                   }
               }
           } catch (final IOException e) {
               final ProcessorStateException exception = new ProcessorStateException(
                   String.format("%sFatal error while trying to close the state manager for task %s", logPrefix, id), e
               );
               firstException.compareAndSet(null, exception);
           } <---- ...ALL THE WAY TO HERE
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1080946697


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +137,107 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+    public void  shouldStillWipeStateStoresIfCloseThrowsException() {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
-
-        expect(stateManager.baseDir()).andReturn(randomFile);
 
-        Utils.delete(randomFile);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
+    public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
-        expect(stateManager.baseDir()).andReturn(unknownFile);
-
-        Utils.delete(unknownFile);
-        expectLastCall().andThrow(new IOException("Deletion failed"));
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateManager.baseDir()).thenReturn(unknownFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+            final ProcessorStateException thrown = assertThrows(
+                    ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+                            "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        replayAll();
+            assertEquals(IOException.class, thrown.getCause().getClass());
+        }
 
-        final ProcessorStateException thrown = assertThrows(
-            ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
-                "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        assertEquals(IOException.class, thrown.getCause().getClass());
-
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
     public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(false);
-
-        stateManager.close();
-        expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(false);
 
         StateManagerUtil.closeStateManager(
-            logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
+                logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
+
+        inOrder.verify(stateManager, never()).close();
+        inOrder.verify(stateManager, never()).baseDir();
+        inOrder.verify(stateDirectory, never()).unlock(taskId);

Review Comment:
   I do not know how much it makes sense to verify the call order of methods that should not be called.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +137,107 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+    public void  shouldStillWipeStateStoresIfCloseThrowsException() {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
-
-        expect(stateManager.baseDir()).andReturn(randomFile);
 
-        Utils.delete(randomFile);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
+    public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
-        expect(stateManager.baseDir()).andReturn(unknownFile);
-
-        Utils.delete(unknownFile);
-        expectLastCall().andThrow(new IOException("Deletion failed"));
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateManager.baseDir()).thenReturn(unknownFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+            final ProcessorStateException thrown = assertThrows(
+                    ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+                            "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        replayAll();
+            assertEquals(IOException.class, thrown.getCause().getClass());
+        }
 
-        final ProcessorStateException thrown = assertThrows(
-            ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
-                "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        assertEquals(IOException.class, thrown.getCause().getClass());
-
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
     public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(false);
-
-        stateManager.close();
-        expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(false);
 
         StateManagerUtil.closeStateManager(
-            logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
+                logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
+
+        inOrder.verify(stateManager, never()).close();
+        inOrder.verify(stateManager, never()).baseDir();
+        inOrder.verify(stateDirectory, never()).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws IOException {
-        final File unknownFile = new File("/unknown/path");
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(false);
-
-        expect(stateManager.baseDir()).andReturn(unknownFile);
-
-        Utils.delete(unknownFile);
-        expectLastCall().andThrow(new AssertionError("Should not be trying to wipe state you don't own!"));
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
+    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(false);
 
         StateManagerUtil.closeStateManager(
-            logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
+                logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
+
+        inOrder.verify(stateManager, never()).close();
+        inOrder.verify(stateManager, never()).baseDir();
+        inOrder.verify(stateDirectory, never()).unlock(taskId);

Review Comment:
   See my comment above about call order.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1081139070


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +137,107 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+    public void  shouldStillWipeStateStoresIfCloseThrowsException() {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
-
-        expect(stateManager.baseDir()).andReturn(randomFile);
 
-        Utils.delete(randomFile);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
+    public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
-        expect(stateManager.baseDir()).andReturn(unknownFile);
-
-        Utils.delete(unknownFile);
-        expectLastCall().andThrow(new IOException("Deletion failed"));
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateManager.baseDir()).thenReturn(unknownFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+            final ProcessorStateException thrown = assertThrows(
+                    ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+                            "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        replayAll();
+            assertEquals(IOException.class, thrown.getCause().getClass());
+        }
 
-        final ProcessorStateException thrown = assertThrows(
-            ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
-                "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        assertEquals(IOException.class, thrown.getCause().getClass());
-
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
     public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(false);
-
-        stateManager.close();
-        expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(false);
 
         StateManagerUtil.closeStateManager(
-            logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
+                logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
+
+        inOrder.verify(stateManager, never()).close();
+        inOrder.verify(stateManager, never()).baseDir();
+        inOrder.verify(stateDirectory, never()).unlock(taskId);

Review Comment:
   This is a fair point. I used `inOrder` because the original test wanted to be strict. This being said it wanted to be strict on the actual calls rather than the calls which weren't made. Does
   ```
       @Test
       public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
           final InOrder inOrder = inOrder(stateManager, stateDirectory);
           when(stateManager.taskId()).thenReturn(taskId);
           when(stateDirectory.lock(taskId)).thenReturn(false);
   
           StateManagerUtil.closeStateManager(
                   logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
   
           inOrder.verify(stateManager).taskId(); <- ENSURE ORDER
           inOrder.verify(stateDirectory).lock(taskId); <- ENSURE ORDER
           verify(stateManager, never()).close();
           verify(stateManager, never()).baseDir();
           verify(stateDirectory, never()).unlock(taskId);
           verifyNoMoreInteractions(stateManager, stateDirectory);
       }
   
       @Test
       public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
           final InOrder inOrder = inOrder(stateManager, stateDirectory);
           when(stateManager.taskId()).thenReturn(taskId);
           when(stateDirectory.lock(taskId)).thenReturn(false);
   
           StateManagerUtil.closeStateManager(
                   logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
   
           inOrder.verify(stateManager).taskId(); <- ENSURE ORDER
           inOrder.verify(stateDirectory).lock(taskId); <- ENSURE ORDER
           verify(stateManager, never()).close(); 
           verify(stateManager, never()).baseDir();
           verify(stateDirectory, never()).unlock(taskId);
           verifyNoMoreInteractions(stateManager, stateDirectory);
       }
   ```
   look better to you?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org