You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/04 14:16:24 UTC

[GitHub] [kafka] clolov opened a new pull request, #12823: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

clolov opened a new pull request, #12823:
URL: https://github.com/apache/kafka/pull/12823

   Batch 2 of the tests detailed in https://issues.apache.org/jira/browse/KAFKA-14132 which use PowerMock/EasyMock and need to be moved to Mockito.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #12823: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in connect tests

Posted by GitBox <gi...@apache.org>.
C0urante merged PR #12823:
URL: https://github.com/apache/kafka/pull/12823


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12823: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12823:
URL: https://github.com/apache/kafka/pull/12823#discussion_r1014154097


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -71,122 +74,74 @@ public void setup() {
                 Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
         WorkerConfig config = new StandaloneConfig(workerProps);
         committer = new SourceTaskOffsetCommitter(config, executor, committers);
-        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog);
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testSchedule() {
-        Capture<Runnable> taskWrapper = EasyMock.newCapture();
+        ArgumentCaptor<Runnable> taskWrapper = ArgumentCaptor.forClass(Runnable.class);
 
-        EasyMock.expect(executor.scheduleWithFixedDelay(
-                EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
+        when(executor.scheduleWithFixedDelay(
+                taskWrapper.capture(), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
                 eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS))
-        ).andReturn((ScheduledFuture) commitFuture);
-
-        PowerMock.replayAll();
+        ).thenReturn((ScheduledFuture) commitFuture);
 
         committer.schedule(taskId, task);
-        assertTrue(taskWrapper.hasCaptured());
         assertNotNull(taskWrapper.getValue());
         assertEquals(singletonMap(taskId, commitFuture), committers);
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testClose() throws Exception {
         long timeoutMs = 1000;
 
         // Normal termination, where termination times out.
-        executor.shutdown();
-        PowerMock.expectLastCall();
-
-        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS)))
-                .andReturn(false);
-        mockLog.error(EasyMock.anyString());

Review Comment:
   We want to verify here that at least one `ERROR`-level log message was emitted by the `SourceTaskOffsetCommitter` instance during this part of the test.
   
   We can use the `LogCaptureAppender` class to do that exact thing, without installing a mock logger for the class:
   ```java
           try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(SourceTaskOffsetCommitter.class)) {
               committer.close(timeoutMs);
               assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("ERROR")));
           }
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -71,122 +74,74 @@ public void setup() {
                 Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
         WorkerConfig config = new StandaloneConfig(workerProps);
         committer = new SourceTaskOffsetCommitter(config, executor, committers);
-        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog);
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testSchedule() {
-        Capture<Runnable> taskWrapper = EasyMock.newCapture();
+        ArgumentCaptor<Runnable> taskWrapper = ArgumentCaptor.forClass(Runnable.class);
 
-        EasyMock.expect(executor.scheduleWithFixedDelay(
-                EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
+        when(executor.scheduleWithFixedDelay(
+                taskWrapper.capture(), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
                 eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS))
-        ).andReturn((ScheduledFuture) commitFuture);
-
-        PowerMock.replayAll();
+        ).thenReturn((ScheduledFuture) commitFuture);
 
         committer.schedule(taskId, task);
-        assertTrue(taskWrapper.hasCaptured());
         assertNotNull(taskWrapper.getValue());
         assertEquals(singletonMap(taskId, commitFuture), committers);
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testClose() throws Exception {
         long timeoutMs = 1000;
 
         // Normal termination, where termination times out.
-        executor.shutdown();
-        PowerMock.expectLastCall();
-
-        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS)))
-                .andReturn(false);
-        mockLog.error(EasyMock.anyString());
-        PowerMock.expectLastCall();
-        PowerMock.replayAll();
+        when(executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)).thenReturn(false);
 
         committer.close(timeoutMs);
 
-        PowerMock.verifyAll();
-        PowerMock.resetAll();
-
         // Termination interrupted
-        executor.shutdown();
-        PowerMock.expectLastCall();
-
-        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS)))
-                .andThrow(new InterruptedException());
-        PowerMock.replayAll();
+        when(executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)).thenThrow(new InterruptedException());
 
         committer.close(timeoutMs);
 
-        PowerMock.verifyAll();
+        verify(executor, times(2)).shutdown();
     }
 
     @Test
     public void testRemove() throws Exception {

Review Comment:
   We're losing some coverage guarantees by restructuring the tests like this. Right now we know for each sub-case (removing a task successfully, removing a cancelled one, and being interrupted while removing one) that we call `cancel` and `get` on the future. With the changes here, we do get guarantees that those methods are invoked at least once at some point in the `testRemove` method (thanks to strict stubbing), but we don't get finer-grained guarantees than that. And ensuring that those methods are invoked exactly when we expect is fairly valuable, so it'd be nice if we could keep that coverage.
   
   I think we can break this test case out into four separate cases:
   1. Removing a non-existent task
   2. Removing a task successfully
   3. Removing a task that gets cancelled
   4. Removing a task and getting interrupted while doing so
   
   And we can use a helper method to reduce duplication for the last three:
   ```java
   private void testRemove() {
       // Try to remove an existing task
       when(taskFuture.cancel(false)).thenReturn(false);
       when(taskFuture.isDone()).thenReturn(false);
       when(taskId.connector()).thenReturn("MyConnector");
       when(taskId.task()).thenReturn(1);
   
       committers.put(taskId, taskFuture);
       committer.remove(taskId);
   }
   ```
   
   Which would make each test case fairly brief. For example, case 2 might look like this:
   
   ```java
   @Test
   public void testRemoveSuccess() throws Exception {
       when(taskFuture.get()).thenReturn(null);
       testRemove();
       assertEquals(Collections.emptyMap(), committers);
   }
   ```
   
   This way, after each test case completes, strict stubbing will ensure that the expected methods were invoked on the future at least once. We could go further and explicitly verify that they were invoked exactly once, but up to you if you'd like to add that coverage or not.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -71,122 +74,74 @@ public void setup() {
                 Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
         WorkerConfig config = new StandaloneConfig(workerProps);
         committer = new SourceTaskOffsetCommitter(config, executor, committers);
-        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog);
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testSchedule() {
-        Capture<Runnable> taskWrapper = EasyMock.newCapture();
+        ArgumentCaptor<Runnable> taskWrapper = ArgumentCaptor.forClass(Runnable.class);
 
-        EasyMock.expect(executor.scheduleWithFixedDelay(
-                EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
+        when(executor.scheduleWithFixedDelay(
+                taskWrapper.capture(), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
                 eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS))
-        ).andReturn((ScheduledFuture) commitFuture);
-
-        PowerMock.replayAll();
+        ).thenReturn((ScheduledFuture) commitFuture);
 
         committer.schedule(taskId, task);
-        assertTrue(taskWrapper.hasCaptured());
         assertNotNull(taskWrapper.getValue());
         assertEquals(singletonMap(taskId, commitFuture), committers);
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testClose() throws Exception {
         long timeoutMs = 1000;
 
         // Normal termination, where termination times out.
-        executor.shutdown();
-        PowerMock.expectLastCall();
-
-        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS)))
-                .andReturn(false);
-        mockLog.error(EasyMock.anyString());
-        PowerMock.expectLastCall();
-        PowerMock.replayAll();
+        when(executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)).thenReturn(false);
 
         committer.close(timeoutMs);

Review Comment:
   Can we add a `verify(executor, times(1)).shutdown();` after this line, to ensure that the executor has actually been shut down by this point?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -71,122 +74,74 @@ public void setup() {
                 Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
         WorkerConfig config = new StandaloneConfig(workerProps);
         committer = new SourceTaskOffsetCommitter(config, executor, committers);
-        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog);
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testSchedule() {
-        Capture<Runnable> taskWrapper = EasyMock.newCapture();
+        ArgumentCaptor<Runnable> taskWrapper = ArgumentCaptor.forClass(Runnable.class);
 
-        EasyMock.expect(executor.scheduleWithFixedDelay(
-                EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
+        when(executor.scheduleWithFixedDelay(
+                taskWrapper.capture(), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
                 eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS))
-        ).andReturn((ScheduledFuture) commitFuture);
-
-        PowerMock.replayAll();
+        ).thenReturn((ScheduledFuture) commitFuture);
 
         committer.schedule(taskId, task);
-        assertTrue(taskWrapper.hasCaptured());
         assertNotNull(taskWrapper.getValue());
         assertEquals(singletonMap(taskId, commitFuture), committers);
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testClose() throws Exception {
         long timeoutMs = 1000;
 
         // Normal termination, where termination times out.
-        executor.shutdown();
-        PowerMock.expectLastCall();
-
-        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS)))
-                .andReturn(false);
-        mockLog.error(EasyMock.anyString());
-        PowerMock.expectLastCall();
-        PowerMock.replayAll();
+        when(executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)).thenReturn(false);
 
         committer.close(timeoutMs);
 
-        PowerMock.verifyAll();
-        PowerMock.resetAll();
-
         // Termination interrupted
-        executor.shutdown();
-        PowerMock.expectLastCall();
-
-        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS)))
-                .andThrow(new InterruptedException());
-        PowerMock.replayAll();
+        when(executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)).thenThrow(new InterruptedException());
 
         committer.close(timeoutMs);
 
-        PowerMock.verifyAll();
+        verify(executor, times(2)).shutdown();
     }
 
     @Test
     public void testRemove() throws Exception {
         // Try to remove a non-existing task
-        PowerMock.replayAll();
-
         assertTrue(committers.isEmpty());
         committer.remove(taskId);
         assertTrue(committers.isEmpty());
 
-        PowerMock.verifyAll();
-        PowerMock.resetAll();
-
         // Try to remove an existing task
-        EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false);
-        EasyMock.expect(taskFuture.isDone()).andReturn(false);
-        EasyMock.expect(taskFuture.get()).andReturn(null);
-        EasyMock.expect(taskId.connector()).andReturn("MyConnector");
-        EasyMock.expect(taskId.task()).andReturn(1);
-        PowerMock.replayAll();
+        when(taskFuture.cancel(false)).thenReturn(false);
+        when(taskFuture.isDone()).thenReturn(false);
+        when(taskFuture.get())
+                .thenReturn(null)
+                .thenThrow(new CancellationException())
+                .thenThrow(new InterruptedException());
+        when(taskId.connector()).thenReturn("MyConnector");
+        when(taskId.task()).thenReturn(1);
 
         committers.put(taskId, taskFuture);
         committer.remove(taskId);
         assertTrue(committers.isEmpty());
 
-        PowerMock.verifyAll();
-        PowerMock.resetAll();
-
         // Try to remove a cancelled task
-        EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false);
-        EasyMock.expect(taskFuture.isDone()).andReturn(false);
-        EasyMock.expect(taskFuture.get()).andThrow(new CancellationException());
-        EasyMock.expect(taskId.connector()).andReturn("MyConnector");
-        EasyMock.expect(taskId.task()).andReturn(1);
-        mockLog.trace(EasyMock.anyString(), EasyMock.<Object>anyObject());

Review Comment:
   We can also use the `LogCaptureAppender` class here. Will probably have to invoke `LogCaptureAppender.setClassLoggerToTrace(SourceTaskOffsetCommitter.class);`



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -71,122 +74,74 @@ public void setup() {
                 Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
         WorkerConfig config = new StandaloneConfig(workerProps);
         committer = new SourceTaskOffsetCommitter(config, executor, committers);
-        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog);

Review Comment:
   I'm not a fan of using Whitebox to mess around with private fields, but we should find a way to preserve the coverage guarantees that we get from these tests w/r/t logging. Left a comment below with a suggestion on how we can do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12823: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
C0urante commented on PR #12823:
URL: https://github.com/apache/kafka/pull/12823#issuecomment-1303685374

   Hi @clolov - thanks for the PR! The title mentions streams tests; should that be updated?
   
   I plan to take a look sometime within the next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12823: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in connect tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12823:
URL: https://github.com/apache/kafka/pull/12823#discussion_r1015611950


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -185,8 +160,13 @@ public void testRemove() throws Exception {
         } catch (ConnectException e) {
             //ignore
         }
+    }
 
-        PowerMock.verifyAll();
+    private void removeSetup() {

Review Comment:
   Yup, makes a lot of sense. Changed in the next commit :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12823: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in connect tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12823:
URL: https://github.com/apache/kafka/pull/12823#discussion_r1015611423


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -185,8 +160,13 @@ public void testRemove() throws Exception {
         } catch (ConnectException e) {
             //ignore
         }

Review Comment:
   Nice catch, changed in the subsequent commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12823: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in connect tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12823:
URL: https://github.com/apache/kafka/pull/12823#discussion_r1015276765


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -71,122 +74,74 @@ public void setup() {
                 Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
         WorkerConfig config = new StandaloneConfig(workerProps);
         committer = new SourceTaskOffsetCommitter(config, executor, committers);
-        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog);
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testSchedule() {
-        Capture<Runnable> taskWrapper = EasyMock.newCapture();
+        ArgumentCaptor<Runnable> taskWrapper = ArgumentCaptor.forClass(Runnable.class);
 
-        EasyMock.expect(executor.scheduleWithFixedDelay(
-                EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
+        when(executor.scheduleWithFixedDelay(
+                taskWrapper.capture(), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
                 eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS))
-        ).andReturn((ScheduledFuture) commitFuture);
-
-        PowerMock.replayAll();
+        ).thenReturn((ScheduledFuture) commitFuture);
 
         committer.schedule(taskId, task);
-        assertTrue(taskWrapper.hasCaptured());
         assertNotNull(taskWrapper.getValue());
         assertEquals(singletonMap(taskId, commitFuture), committers);
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testClose() throws Exception {
         long timeoutMs = 1000;
 
         // Normal termination, where termination times out.
-        executor.shutdown();
-        PowerMock.expectLastCall();
-
-        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS)))
-                .andReturn(false);
-        mockLog.error(EasyMock.anyString());
-        PowerMock.expectLastCall();
-        PowerMock.replayAll();
+        when(executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)).thenReturn(false);
 
         committer.close(timeoutMs);
 
-        PowerMock.verifyAll();
-        PowerMock.resetAll();
-
         // Termination interrupted
-        executor.shutdown();
-        PowerMock.expectLastCall();
-
-        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS)))
-                .andThrow(new InterruptedException());
-        PowerMock.replayAll();
+        when(executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)).thenThrow(new InterruptedException());
 
         committer.close(timeoutMs);
 
-        PowerMock.verifyAll();
+        verify(executor, times(2)).shutdown();
     }
 
     @Test
     public void testRemove() throws Exception {

Review Comment:
   I have split this into 4 tests as you suggested. I have a slight preference for only abstracting the setup of the mocks, but if you would prefer to have as little duplication as possible I will change it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12823: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in connect tests

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12823:
URL: https://github.com/apache/kafka/pull/12823#discussion_r1015522678


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -185,8 +160,13 @@ public void testRemove() throws Exception {
         } catch (ConnectException e) {
             //ignore
         }
+    }
 
-        PowerMock.verifyAll();
+    private void removeSetup() {

Review Comment:
   Nit: "remove setup" kind of sounds like "remove" is a verb and we're removing something. Maybe `expectRemove`, which would also line up nicely with other utility functions in other tests?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -71,122 +74,74 @@ public void setup() {
                 Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
         WorkerConfig config = new StandaloneConfig(workerProps);
         committer = new SourceTaskOffsetCommitter(config, executor, committers);
-        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog);
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testSchedule() {
-        Capture<Runnable> taskWrapper = EasyMock.newCapture();
+        ArgumentCaptor<Runnable> taskWrapper = ArgumentCaptor.forClass(Runnable.class);
 
-        EasyMock.expect(executor.scheduleWithFixedDelay(
-                EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
+        when(executor.scheduleWithFixedDelay(
+                taskWrapper.capture(), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
                 eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS))
-        ).andReturn((ScheduledFuture) commitFuture);
-
-        PowerMock.replayAll();
+        ).thenReturn((ScheduledFuture) commitFuture);
 
         committer.schedule(taskId, task);
-        assertTrue(taskWrapper.hasCaptured());
         assertNotNull(taskWrapper.getValue());
         assertEquals(singletonMap(taskId, commitFuture), committers);
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testClose() throws Exception {
         long timeoutMs = 1000;
 
         // Normal termination, where termination times out.
-        executor.shutdown();
-        PowerMock.expectLastCall();
-
-        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS)))
-                .andReturn(false);
-        mockLog.error(EasyMock.anyString());
-        PowerMock.expectLastCall();
-        PowerMock.replayAll();
+        when(executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)).thenReturn(false);
 
         committer.close(timeoutMs);
 
-        PowerMock.verifyAll();
-        PowerMock.resetAll();
-
         // Termination interrupted
-        executor.shutdown();
-        PowerMock.expectLastCall();
-
-        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS)))
-                .andThrow(new InterruptedException());
-        PowerMock.replayAll();
+        when(executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)).thenThrow(new InterruptedException());
 
         committer.close(timeoutMs);
 
-        PowerMock.verifyAll();
+        verify(executor, times(2)).shutdown();
     }
 
     @Test
     public void testRemove() throws Exception {

Review Comment:
   Looks great!



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -71,112 +75,83 @@ public void setup() {
                 Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
         WorkerConfig config = new StandaloneConfig(workerProps);
         committer = new SourceTaskOffsetCommitter(config, executor, committers);
-        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog);
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testSchedule() {
-        Capture<Runnable> taskWrapper = EasyMock.newCapture();
+        ArgumentCaptor<Runnable> taskWrapper = ArgumentCaptor.forClass(Runnable.class);
 
-        EasyMock.expect(executor.scheduleWithFixedDelay(
-                EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
+        when(executor.scheduleWithFixedDelay(
+                taskWrapper.capture(), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
                 eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS))
-        ).andReturn((ScheduledFuture) commitFuture);
-
-        PowerMock.replayAll();
+        ).thenReturn((ScheduledFuture) commitFuture);
 
         committer.schedule(taskId, task);
-        assertTrue(taskWrapper.hasCaptured());
         assertNotNull(taskWrapper.getValue());
         assertEquals(singletonMap(taskId, commitFuture), committers);
-
-        PowerMock.verifyAll();
     }
 
     @Test
-    public void testClose() throws Exception {
+    public void testCloseWithinTimeout() throws Exception {

Review Comment:
   Nit: this name seems to go against the case that we're covering here (where `close` fails to complete within the expected timeout). Maybe rename to `testCloseTimeout`?



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -185,8 +160,13 @@ public void testRemove() throws Exception {
         } catch (ConnectException e) {
             //ignore
         }

Review Comment:
   We can replace this with `assertThrows`:
   ```java
           committers.put(taskId, taskFuture);
           assertThrows(ConnectException.class, () -> committer.remove(taskId));
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -71,112 +75,83 @@ public void setup() {
                 Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
         WorkerConfig config = new StandaloneConfig(workerProps);
         committer = new SourceTaskOffsetCommitter(config, executor, committers);
-        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog);
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testSchedule() {
-        Capture<Runnable> taskWrapper = EasyMock.newCapture();
+        ArgumentCaptor<Runnable> taskWrapper = ArgumentCaptor.forClass(Runnable.class);
 
-        EasyMock.expect(executor.scheduleWithFixedDelay(
-                EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
+        when(executor.scheduleWithFixedDelay(
+                taskWrapper.capture(), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
                 eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS))
-        ).andReturn((ScheduledFuture) commitFuture);
-
-        PowerMock.replayAll();
+        ).thenReturn((ScheduledFuture) commitFuture);
 
         committer.schedule(taskId, task);
-        assertTrue(taskWrapper.hasCaptured());
         assertNotNull(taskWrapper.getValue());
         assertEquals(singletonMap(taskId, commitFuture), committers);
-
-        PowerMock.verifyAll();
     }
 
     @Test
-    public void testClose() throws Exception {
+    public void testCloseWithinTimeout() throws Exception {
         long timeoutMs = 1000;
 
         // Normal termination, where termination times out.
-        executor.shutdown();
-        PowerMock.expectLastCall();
+        when(executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)).thenReturn(false);
 
-        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS)))
-                .andReturn(false);
-        mockLog.error(EasyMock.anyString());
-        PowerMock.expectLastCall();
-        PowerMock.replayAll();
+        try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(SourceTaskOffsetCommitter.class)) {
+            committer.close(timeoutMs);
+            assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("ERROR")));
+        }
 
-        committer.close(timeoutMs);
+        verify(executor).shutdown();
+    }
 
-        PowerMock.verifyAll();
-        PowerMock.resetAll();
+    @Test
+    public void testCloseOutsideOfTimeout() throws InterruptedException {

Review Comment:
   Nit: "outside of timeout" is a little unclear; maybe just `testCloseInterrupted`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12823: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in connect tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12823:
URL: https://github.com/apache/kafka/pull/12823#discussion_r1015261223


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -71,122 +74,74 @@ public void setup() {
                 Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
         WorkerConfig config = new StandaloneConfig(workerProps);
         committer = new SourceTaskOffsetCommitter(config, executor, committers);
-        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog);
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testSchedule() {
-        Capture<Runnable> taskWrapper = EasyMock.newCapture();
+        ArgumentCaptor<Runnable> taskWrapper = ArgumentCaptor.forClass(Runnable.class);
 
-        EasyMock.expect(executor.scheduleWithFixedDelay(
-                EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
+        when(executor.scheduleWithFixedDelay(
+                taskWrapper.capture(), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
                 eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS))
-        ).andReturn((ScheduledFuture) commitFuture);
-
-        PowerMock.replayAll();
+        ).thenReturn((ScheduledFuture) commitFuture);
 
         committer.schedule(taskId, task);
-        assertTrue(taskWrapper.hasCaptured());
         assertNotNull(taskWrapper.getValue());
         assertEquals(singletonMap(taskId, commitFuture), committers);
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testClose() throws Exception {
         long timeoutMs = 1000;
 
         // Normal termination, where termination times out.
-        executor.shutdown();
-        PowerMock.expectLastCall();
-
-        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS)))
-                .andReturn(false);
-        mockLog.error(EasyMock.anyString());
-        PowerMock.expectLastCall();
-        PowerMock.replayAll();
+        when(executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)).thenReturn(false);
 
         committer.close(timeoutMs);

Review Comment:
   I have opted for splitting this test case into two. Verify should now be present in both.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12823: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in connect tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12823:
URL: https://github.com/apache/kafka/pull/12823#discussion_r1015279648


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -71,122 +74,74 @@ public void setup() {
                 Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
         WorkerConfig config = new StandaloneConfig(workerProps);
         committer = new SourceTaskOffsetCommitter(config, executor, committers);
-        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog);
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testSchedule() {
-        Capture<Runnable> taskWrapper = EasyMock.newCapture();
+        ArgumentCaptor<Runnable> taskWrapper = ArgumentCaptor.forClass(Runnable.class);
 
-        EasyMock.expect(executor.scheduleWithFixedDelay(
-                EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
+        when(executor.scheduleWithFixedDelay(
+                taskWrapper.capture(), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
                 eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS))
-        ).andReturn((ScheduledFuture) commitFuture);
-
-        PowerMock.replayAll();
+        ).thenReturn((ScheduledFuture) commitFuture);
 
         committer.schedule(taskId, task);
-        assertTrue(taskWrapper.hasCaptured());
         assertNotNull(taskWrapper.getValue());
         assertEquals(singletonMap(taskId, commitFuture), committers);
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testClose() throws Exception {
         long timeoutMs = 1000;
 
         // Normal termination, where termination times out.
-        executor.shutdown();
-        PowerMock.expectLastCall();
-
-        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS)))
-                .andReturn(false);
-        mockLog.error(EasyMock.anyString());
-        PowerMock.expectLastCall();
-        PowerMock.replayAll();
+        when(executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)).thenReturn(false);
 
         committer.close(timeoutMs);
 
-        PowerMock.verifyAll();
-        PowerMock.resetAll();
-
         // Termination interrupted
-        executor.shutdown();
-        PowerMock.expectLastCall();
-
-        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS)))
-                .andThrow(new InterruptedException());
-        PowerMock.replayAll();
+        when(executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)).thenThrow(new InterruptedException());
 
         committer.close(timeoutMs);
 
-        PowerMock.verifyAll();
+        verify(executor, times(2)).shutdown();
     }
 
     @Test
     public void testRemove() throws Exception {
         // Try to remove a non-existing task
-        PowerMock.replayAll();
-
         assertTrue(committers.isEmpty());
         committer.remove(taskId);
         assertTrue(committers.isEmpty());
 
-        PowerMock.verifyAll();
-        PowerMock.resetAll();
-
         // Try to remove an existing task
-        EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false);
-        EasyMock.expect(taskFuture.isDone()).andReturn(false);
-        EasyMock.expect(taskFuture.get()).andReturn(null);
-        EasyMock.expect(taskId.connector()).andReturn("MyConnector");
-        EasyMock.expect(taskId.task()).andReturn(1);
-        PowerMock.replayAll();
+        when(taskFuture.cancel(false)).thenReturn(false);
+        when(taskFuture.isDone()).thenReturn(false);
+        when(taskFuture.get())
+                .thenReturn(null)
+                .thenThrow(new CancellationException())
+                .thenThrow(new InterruptedException());
+        when(taskId.connector()).thenReturn("MyConnector");
+        when(taskId.task()).thenReturn(1);
 
         committers.put(taskId, taskFuture);
         committer.remove(taskId);
         assertTrue(committers.isEmpty());
 
-        PowerMock.verifyAll();
-        PowerMock.resetAll();
-
         // Try to remove a cancelled task
-        EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false);
-        EasyMock.expect(taskFuture.isDone()).andReturn(false);
-        EasyMock.expect(taskFuture.get()).andThrow(new CancellationException());
-        EasyMock.expect(taskId.connector()).andReturn("MyConnector");
-        EasyMock.expect(taskId.task()).andReturn(1);
-        mockLog.trace(EasyMock.anyString(), EasyMock.<Object>anyObject());

Review Comment:
   Amended in the subsequent commit!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #12823: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in connect tests

Posted by GitBox <gi...@apache.org>.
clolov commented on PR #12823:
URL: https://github.com/apache/kafka/pull/12823#issuecomment-1305432745

   Hey @C0urante! Thanks a lot for the review. Hopefully I have addressed the first batch of comments :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #12823: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on PR #12823:
URL: https://github.com/apache/kafka/pull/12823#issuecomment-1303625752

   Hey @C0urante! Tagging you for visibility as this is a Connect-related test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on pull request #12823: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in connect tests

Posted by GitBox <gi...@apache.org>.
clolov commented on PR #12823:
URL: https://github.com/apache/kafka/pull/12823#issuecomment-1305894872

   Thank you very much for the quick turnaround @C0urante!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12823: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in streams tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12823:
URL: https://github.com/apache/kafka/pull/12823#discussion_r1014077019


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -71,122 +74,74 @@ public void setup() {
                 Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
         WorkerConfig config = new StandaloneConfig(workerProps);
         committer = new SourceTaskOffsetCommitter(config, executor, committers);
-        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog);

Review Comment:
   I did not manage to find such a functionality in Mockito, so I decided to get rid of it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12823: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in connect tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12823:
URL: https://github.com/apache/kafka/pull/12823#discussion_r1015259138


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -71,122 +74,74 @@ public void setup() {
                 Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
         WorkerConfig config = new StandaloneConfig(workerProps);
         committer = new SourceTaskOffsetCommitter(config, executor, committers);
-        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog);
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testSchedule() {
-        Capture<Runnable> taskWrapper = EasyMock.newCapture();
+        ArgumentCaptor<Runnable> taskWrapper = ArgumentCaptor.forClass(Runnable.class);
 
-        EasyMock.expect(executor.scheduleWithFixedDelay(
-                EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
+        when(executor.scheduleWithFixedDelay(
+                taskWrapper.capture(), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
                 eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS))
-        ).andReturn((ScheduledFuture) commitFuture);
-
-        PowerMock.replayAll();
+        ).thenReturn((ScheduledFuture) commitFuture);
 
         committer.schedule(taskId, task);
-        assertTrue(taskWrapper.hasCaptured());
         assertNotNull(taskWrapper.getValue());
         assertEquals(singletonMap(taskId, commitFuture), committers);
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testClose() throws Exception {
         long timeoutMs = 1000;
 
         // Normal termination, where termination times out.
-        executor.shutdown();
-        PowerMock.expectLastCall();
-
-        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS)))
-                .andReturn(false);
-        mockLog.error(EasyMock.anyString());

Review Comment:
   This is a very fair and clean suggestion. I have added it to the subsequent commits.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on pull request #12823: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in connect tests

Posted by GitBox <gi...@apache.org>.
C0urante commented on PR #12823:
URL: https://github.com/apache/kafka/pull/12823#issuecomment-1305875235

   Changes in last commit were trivially verifiable, and the CI run for the commit before that did not contain any relevant test failures. Merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #12823: KAFKA-14132: Replace PowerMock and EasyMock with Mockito in connect tests

Posted by GitBox <gi...@apache.org>.
clolov commented on code in PR #12823:
URL: https://github.com/apache/kafka/pull/12823#discussion_r1015610959


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -71,112 +75,83 @@ public void setup() {
                 Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
         WorkerConfig config = new StandaloneConfig(workerProps);
         committer = new SourceTaskOffsetCommitter(config, executor, committers);
-        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog);
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testSchedule() {
-        Capture<Runnable> taskWrapper = EasyMock.newCapture();
+        ArgumentCaptor<Runnable> taskWrapper = ArgumentCaptor.forClass(Runnable.class);
 
-        EasyMock.expect(executor.scheduleWithFixedDelay(
-                EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
+        when(executor.scheduleWithFixedDelay(
+                taskWrapper.capture(), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
                 eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS))
-        ).andReturn((ScheduledFuture) commitFuture);
-
-        PowerMock.replayAll();
+        ).thenReturn((ScheduledFuture) commitFuture);
 
         committer.schedule(taskId, task);
-        assertTrue(taskWrapper.hasCaptured());
         assertNotNull(taskWrapper.getValue());
         assertEquals(singletonMap(taskId, commitFuture), committers);
-
-        PowerMock.verifyAll();
     }
 
     @Test
-    public void testClose() throws Exception {
+    public void testCloseWithinTimeout() throws Exception {

Review Comment:
   Fair point, changed in the next commit.



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java:
##########
@@ -71,112 +75,83 @@ public void setup() {
                 Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
         WorkerConfig config = new StandaloneConfig(workerProps);
         committer = new SourceTaskOffsetCommitter(config, executor, committers);
-        Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog);
     }
 
     @SuppressWarnings("unchecked")
     @Test
     public void testSchedule() {
-        Capture<Runnable> taskWrapper = EasyMock.newCapture();
+        ArgumentCaptor<Runnable> taskWrapper = ArgumentCaptor.forClass(Runnable.class);
 
-        EasyMock.expect(executor.scheduleWithFixedDelay(
-                EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
+        when(executor.scheduleWithFixedDelay(
+                taskWrapper.capture(), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
                 eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS))
-        ).andReturn((ScheduledFuture) commitFuture);
-
-        PowerMock.replayAll();
+        ).thenReturn((ScheduledFuture) commitFuture);
 
         committer.schedule(taskId, task);
-        assertTrue(taskWrapper.hasCaptured());
         assertNotNull(taskWrapper.getValue());
         assertEquals(singletonMap(taskId, commitFuture), committers);
-
-        PowerMock.verifyAll();
     }
 
     @Test
-    public void testClose() throws Exception {
+    public void testCloseWithinTimeout() throws Exception {
         long timeoutMs = 1000;
 
         // Normal termination, where termination times out.
-        executor.shutdown();
-        PowerMock.expectLastCall();
+        when(executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)).thenReturn(false);
 
-        EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS)))
-                .andReturn(false);
-        mockLog.error(EasyMock.anyString());
-        PowerMock.expectLastCall();
-        PowerMock.replayAll();
+        try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(SourceTaskOffsetCommitter.class)) {
+            committer.close(timeoutMs);
+            assertTrue(logCaptureAppender.getEvents().stream().anyMatch(e -> e.getLevel().equals("ERROR")));
+        }
 
-        committer.close(timeoutMs);
+        verify(executor).shutdown();
+    }
 
-        PowerMock.verifyAll();
-        PowerMock.resetAll();
+    @Test
+    public void testCloseOutsideOfTimeout() throws InterruptedException {

Review Comment:
   I agree, I have changed it in the subsequent commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org