You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/12 17:18:37 UTC

[GitHub] [kafka] cadonna commented on a diff in pull request #12821: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

cadonna commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1046120121


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -42,77 +40,56 @@
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class StateManagerUtilTest {
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private ProcessorStateManager stateManager;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private StateDirectory stateDirectory;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private ProcessorTopology topology;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private InternalProcessorContext processorContext;
 
-    private IMocksControl ctrl;
-
     private Logger logger = new LogContext("test").logger(AbstractTask.class);
 
     private final TaskId taskId = new TaskId(0, 0);
 
-    @Before
-    public void setup() {
-        ctrl = createStrictControl();
-        topology = ctrl.createMock(ProcessorTopology.class);
-        processorContext = ctrl.createMock(InternalProcessorContext.class);
-
-        stateManager = ctrl.createMock(ProcessorStateManager.class);
-        stateDirectory = ctrl.createMock(StateDirectory.class);
-    }
-
     @Test
     public void testRegisterStateStoreWhenTopologyEmpty() {
-        expect(topology.stateStores()).andReturn(emptyList());
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(topology.stateStores()).thenReturn(emptyList());
 
         StateManagerUtil.registerStateStores(logger,
             "logPrefix:", topology, stateManager, stateDirectory, processorContext);
-
-        ctrl.verify();
     }
 
     @Test
     public void testRegisterStateStoreFailToLockStateDirectory() {
-        expect(topology.stateStores()).andReturn(singletonList(new MockKeyValueStore("store", false)));
-
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(topology.stateStores()).thenReturn(singletonList(new MockKeyValueStore("store", false)));
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -42,77 +40,56 @@
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class StateManagerUtilTest {
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private ProcessorStateManager stateManager;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private StateDirectory stateDirectory;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private ProcessorTopology topology;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private InternalProcessorContext processorContext;
 
-    private IMocksControl ctrl;
-
     private Logger logger = new LogContext("test").logger(AbstractTask.class);
 
     private final TaskId taskId = new TaskId(0, 0);
 
-    @Before
-    public void setup() {
-        ctrl = createStrictControl();
-        topology = ctrl.createMock(ProcessorTopology.class);
-        processorContext = ctrl.createMock(InternalProcessorContext.class);
-
-        stateManager = ctrl.createMock(ProcessorStateManager.class);
-        stateDirectory = ctrl.createMock(StateDirectory.class);
-    }
-
     @Test
     public void testRegisterStateStoreWhenTopologyEmpty() {
-        expect(topology.stateStores()).andReturn(emptyList());
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(topology.stateStores()).thenReturn(emptyList());
 
         StateManagerUtil.registerStateStores(logger,
             "logPrefix:", topology, stateManager, stateDirectory, processorContext);
-
-        ctrl.verify();
     }
 
     @Test
     public void testRegisterStateStoreFailToLockStateDirectory() {
-        expect(topology.stateStores()).andReturn(singletonList(new MockKeyValueStore("store", false)));
-
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(topology.stateStores()).thenReturn(singletonList(new MockKeyValueStore("store", false)));
 
-        expect(stateDirectory.lock(taskId)).andReturn(false);
+        when(stateManager.taskId()).thenReturn(taskId);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 
-        expect(topology.stateStores()).andReturn(stateStores);
+        when(topology.stateStores()).thenReturn(stateStores);
 
-        stateManager.registerStateStores(stateStores, processorContext);
+        when(stateManager.taskId()).thenReturn(taskId);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 
-        expect(topology.stateStores()).andReturn(stateStores);
+        when(topology.stateStores()).thenReturn(stateStores);
 
-        stateManager.registerStateStores(stateStores, processorContext);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.initializeStoreOffsetsFromCheckpoint(true);
-        expectLastCall();
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(topology.stateStores()).thenReturn(stateStores);
 
         StateManagerUtil.registerStateStores(logger, "logPrefix:",
             topology, stateManager, stateDirectory, processorContext);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).registerStateStores(stateStores, processorContext);
+        inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+        verifyNoMoreInteractions(stateManager);
     }
 
     @Test
     public void testCloseStateManagerClean() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.close();
-        expectLastCall();
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenClean() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateManager.taskId()).thenReturn(taskId);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 
-        expect(topology.stateStores()).andReturn(stateStores);
+        when(topology.stateStores()).thenReturn(stateStores);
 
-        stateManager.registerStateStores(stateStores, processorContext);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.initializeStoreOffsetsFromCheckpoint(true);
-        expectLastCall();
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(topology.stateStores()).thenReturn(stateStores);
 
         StateManagerUtil.registerStateStores(logger, "logPrefix:",
             topology, stateManager, stateDirectory, processorContext);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).registerStateStores(stateStores, processorContext);
+        inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+        verifyNoMoreInteractions(stateManager);
     }
 
     @Test
     public void testCloseStateManagerClean() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.close();
-        expectLastCall();
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenClean() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+    public void  shouldStillWipeStateStoresIfCloseThrowsException() {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
 
-        Utils.delete(randomFile);
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
+    public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+    public void  shouldStillWipeStateStoresIfCloseThrowsException() {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+    public void  shouldStillWipeStateStoresIfCloseThrowsException() {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
 
-        Utils.delete(randomFile);
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
+    public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
 
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateManager.baseDir()).thenReturn(unknownFile);
 
-        expect(stateManager.baseDir()).andReturn(unknownFile);
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
 
-        Utils.delete(unknownFile);
-        expectLastCall().andThrow(new IOException("Deletion failed"));
+            final ProcessorStateException thrown = assertThrows(
+                    ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+                            "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+            assertEquals(IOException.class, thrown.getCause().getClass());
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        final ProcessorStateException thrown = assertThrows(
-            ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
-                "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        assertEquals(IOException.class, thrown.getCause().getClass());
-
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(false);
-
-        stateManager.close();
-        expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
+    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +139,92 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        verify(stateManager).close();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
 
-        Utils.delete(randomFile);
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
-        expect(stateManager.baseDir()).andReturn(unknownFile);
 
-        Utils.delete(unknownFile);
-        expectLastCall().andThrow(new IOException("Deletion failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateManager.baseDir()).thenReturn(unknownFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed"));
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+            final ProcessorStateException thrown = assertThrows(
+                    ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
+                            "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        replayAll();
+            assertEquals(IOException.class, thrown.getCause().getClass());
+        }
 
-        final ProcessorStateException thrown = assertThrows(
-            ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger,
-                "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        assertEquals(IOException.class, thrown.getCause().getClass());
-
-        ctrl.verify();
+        verify(stateManager).close();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(false);
-
-        stateManager.close();
-        expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!"));
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
+    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(false);
 
         StateManagerUtil.closeStateManager(
-            logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
-    }
-
-    @Test
-    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws IOException {

Review Comment:
   I see what you mean, but unit tests should also document the behavior of production code and be robust against code changes. In this case, we have indeed to separate cases:
   
   1. When exactly-once is enabled and the state manager is closed dirty and the lock is not owned then the state manager should not be closed, the directory should not be wiped and the directory should not be unlocked.
   2. When exactly-once is disabled and the lock is not owned then the state manager should not be closed and the directory should not be unlocked.
   
   So, I think what you did is fine, although would not verify that `stateManager.baseDir()` is never called but that `Utils.delete()` is never called. However, you need to run that test once with:
   
   ```java
   StateManagerUtil.closeStateManager(
               logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
   ``` 
   
   and once with
   
   ```java
   StateManagerUtil.closeStateManager(
               logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE);
   ``` 
   



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 
-        expect(topology.stateStores()).andReturn(stateStores);
+        when(topology.stateStores()).thenReturn(stateStores);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 
-        expect(topology.stateStores()).andReturn(stateStores);
+        when(topology.stateStores()).thenReturn(stateStores);
 
-        stateManager.registerStateStores(stateStores, processorContext);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.initializeStoreOffsetsFromCheckpoint(true);
-        expectLastCall();
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 
-        expect(topology.stateStores()).andReturn(stateStores);
+        when(topology.stateStores()).thenReturn(stateStores);
 
-        stateManager.registerStateStores(stateStores, processorContext);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.initializeStoreOffsetsFromCheckpoint(true);
-        expectLastCall();
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(topology.stateStores()).thenReturn(stateStores);
 
         StateManagerUtil.registerStateStores(logger, "logPrefix:",
             topology, stateManager, stateDirectory, processorContext);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).registerStateStores(stateStores, processorContext);
+        inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+        verifyNoMoreInteractions(stateManager);
     }
 
     @Test
     public void testCloseStateManagerClean() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+    public void  shouldStillWipeStateStoresIfCloseThrowsException() {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 
-        expect(topology.stateStores()).andReturn(stateStores);
+        when(topology.stateStores()).thenReturn(stateStores);
 
-        stateManager.registerStateStores(stateStores, processorContext);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.initializeStoreOffsetsFromCheckpoint(true);
-        expectLastCall();
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(topology.stateStores()).thenReturn(stateStores);
 
         StateManagerUtil.registerStateStores(logger, "logPrefix:",
             topology, stateManager, stateDirectory, processorContext);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).registerStateStores(stateStores, processorContext);
+        inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+        verifyNoMoreInteractions(stateManager);
     }
 
     @Test
     public void testCloseStateManagerClean() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateManager.taskId()).thenReturn(taskId);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state store.
-        expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void  shouldStillWipeStateStoresIfCloseThrowsException() throws IOException {
+    public void  shouldStillWipeStateStoresIfCloseThrowsException() {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        doThrow(new ProcessorStateException("Close failed")).when(stateManager).close();
 
-        Utils.delete(randomFile);
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException {
+    public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
 
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org