You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2022/09/14 15:00:56 UTC

[kafka] branch trunk updated: KAFKA-14132: Migrate some Connect tests from EasyMock/PowerMock to Mockito (#12615)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bdf2cdb27fa KAFKA-14132: Migrate some Connect tests from EasyMock/PowerMock to Mockito (#12615)
bdf2cdb27fa is described below

commit bdf2cdb27fa39c30171a0e63641cd8e8b851fe0c
Author: Yash Mayya <ya...@gmail.com>
AuthorDate: Wed Sep 14 20:30:32 2022 +0530

    KAFKA-14132: Migrate some Connect tests from EasyMock/PowerMock to Mockito (#12615)
    
    Reviewers: Chris Egerton <ch...@aiven.io>
---
 .../org/apache/kafka/connect/runtime/Worker.java   |   3 +-
 .../runtime/errors/RetryWithToleranceOperator.java |  13 +-
 .../connect/runtime/ErrorHandlingTaskTest.java     |   8 +-
 .../kafka/connect/runtime/WorkerTaskTest.java      | 251 +++++++--------------
 .../connect/runtime/errors/ErrorReporterTest.java  |  56 ++---
 .../errors/RetryWithToleranceOperatorTest.java     | 200 +++++++---------
 .../errors/WorkerErrantRecordReporterTest.java     |  90 +++++---
 7 files changed, 252 insertions(+), 369 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 55e445ca8e1..293eb0f79d2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -1248,8 +1248,7 @@ public class Worker {
             final Class<? extends Connector> connectorClass = plugins.connectorClass(
                     connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
             RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(),
-                    connectorConfig.errorMaxDelayInMillis(), connectorConfig.errorToleranceType(), Time.SYSTEM);
-            retryWithToleranceOperator.metrics(errorHandlingMetrics);
+                    connectorConfig.errorMaxDelayInMillis(), connectorConfig.errorToleranceType(), Time.SYSTEM, errorHandlingMetrics);
 
             return doBuild(task, id, configState, statusListener, initialState,
                     connectorConfig, keyConverter, valueConverter, headerConverter, classLoader,
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
index 947fabf3d0c..6262f19e190 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -77,24 +77,25 @@ public class RetryWithToleranceOperator implements AutoCloseable {
 
     private long totalFailures = 0;
     private final Time time;
-    private ErrorHandlingMetrics errorHandlingMetrics;
+    private final ErrorHandlingMetrics errorHandlingMetrics;
     private final CountDownLatch stopRequestedLatch;
     private volatile boolean stopping;   // indicates whether the operator has been asked to stop retrying
 
     protected final ProcessingContext context;
 
     public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis,
-                                      ToleranceType toleranceType, Time time) {
-        this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, new ProcessingContext(), new CountDownLatch(1));
+                                      ToleranceType toleranceType, Time time, ErrorHandlingMetrics errorHandlingMetrics) {
+        this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, errorHandlingMetrics, new ProcessingContext(), new CountDownLatch(1));
     }
 
     RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis,
-                               ToleranceType toleranceType, Time time,
+                               ToleranceType toleranceType, Time time, ErrorHandlingMetrics errorHandlingMetrics,
                                ProcessingContext context, CountDownLatch stopRequestedLatch) {
         this.errorRetryTimeout = errorRetryTimeout;
         this.errorMaxDelayInMillis = errorMaxDelayInMillis;
         this.errorToleranceType = toleranceType;
         this.time = time;
+        this.errorHandlingMetrics = errorHandlingMetrics;
         this.context = context;
         this.stopRequestedLatch = stopRequestedLatch;
         this.stopping = false;
@@ -284,10 +285,6 @@ public class RetryWithToleranceOperator implements AutoCloseable {
         }
     }
 
-    public synchronized void metrics(ErrorHandlingMetrics errorHandlingMetrics) {
-        this.errorHandlingMetrics = errorHandlingMetrics;
-    }
-
     @Override
     public String toString() {
         return "RetryWithToleranceOperator{" +
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index b2ba4178805..f1913d9848e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -231,7 +231,6 @@ public class ErrorHandlingTaskTest {
         ErrorReporter reporter = EasyMock.mock(ErrorReporter.class);
 
         RetryWithToleranceOperator retryWithToleranceOperator = operator();
-        retryWithToleranceOperator.metrics(errorHandlingMetrics);
         retryWithToleranceOperator.reporters(singletonList(reporter));
 
         createSinkTask(initialState, retryWithToleranceOperator);
@@ -262,7 +261,6 @@ public class ErrorHandlingTaskTest {
         ErrorReporter reporter = EasyMock.mock(ErrorReporter.class);
 
         RetryWithToleranceOperator retryWithToleranceOperator = operator();
-        retryWithToleranceOperator.metrics(errorHandlingMetrics);
         retryWithToleranceOperator.reporters(singletonList(reporter));
 
         createSourceTask(initialState, retryWithToleranceOperator);
@@ -286,7 +284,6 @@ public class ErrorHandlingTaskTest {
         ErrorReporter reporterB = EasyMock.mock(ErrorReporter.class);
 
         RetryWithToleranceOperator retryWithToleranceOperator = operator();
-        retryWithToleranceOperator.metrics(errorHandlingMetrics);
         retryWithToleranceOperator.reporters(Arrays.asList(reporterA, reporterB));
 
         createSourceTask(initialState, retryWithToleranceOperator);
@@ -316,7 +313,6 @@ public class ErrorHandlingTaskTest {
         LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics);
 
         RetryWithToleranceOperator retryWithToleranceOperator = operator();
-        retryWithToleranceOperator.metrics(errorHandlingMetrics);
         retryWithToleranceOperator.reporters(singletonList(reporter));
         createSinkTask(initialState, retryWithToleranceOperator);
 
@@ -357,7 +353,7 @@ public class ErrorHandlingTaskTest {
     }
 
     private RetryWithToleranceOperator operator() {
-        return new RetryWithToleranceOperator(OPERATOR_RETRY_TIMEOUT_MILLIS, OPERATOR_RETRY_MAX_DELAY_MILLIS, OPERATOR_TOLERANCE_TYPE, SYSTEM);
+        return new RetryWithToleranceOperator(OPERATOR_RETRY_TIMEOUT_MILLIS, OPERATOR_RETRY_MAX_DELAY_MILLIS, OPERATOR_TOLERANCE_TYPE, SYSTEM, errorHandlingMetrics);
     }
 
     @Test
@@ -368,7 +364,6 @@ public class ErrorHandlingTaskTest {
         LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics);
 
         RetryWithToleranceOperator retryWithToleranceOperator = operator();
-        retryWithToleranceOperator.metrics(errorHandlingMetrics);
         retryWithToleranceOperator.reporters(singletonList(reporter));
         createSourceTask(initialState, retryWithToleranceOperator);
 
@@ -433,7 +428,6 @@ public class ErrorHandlingTaskTest {
         LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics);
 
         RetryWithToleranceOperator retryWithToleranceOperator = operator();
-        retryWithToleranceOperator.metrics(errorHandlingMetrics);
         retryWithToleranceOperator.reporters(singletonList(reporter));
         createSourceTask(initialState, retryWithToleranceOperator, badConverter());
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index 890c0f7399c..ae9c75d5253 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.TaskStatus.Listener;
 import org.apache.kafka.connect.runtime.WorkerTask.TaskMetricsGroup;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
@@ -25,29 +26,24 @@ import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.common.utils.MockTime;
-import org.easymock.EasyMock;
-import org.easymock.Mock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.partialMockBuilder;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({WorkerTask.class})
-@PowerMockIgnore("javax.management.*")
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class WorkerTaskTest {
 
     private static final Map<String, String> TASK_PROPS = new HashMap<>();
@@ -56,12 +52,11 @@ public class WorkerTaskTest {
     }
     private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
 
-    private ConnectMetrics metrics;
     @Mock private TaskStatus.Listener statusListener;
     @Mock private ClassLoader loader;
-    RetryWithToleranceOperator retryWithToleranceOperator;
-    @Mock
-    StatusBackingStore statusBackingStore;
+    @Mock private StatusBackingStore statusBackingStore;
+    private ConnectMetrics metrics;
+    private RetryWithToleranceOperator retryWithToleranceOperator;
 
     @Before
     public void setup() {
@@ -78,82 +73,35 @@ public class WorkerTaskTest {
     public void standardStartup() {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
 
-        WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(
-                        ConnectorTaskId.class,
-                        TaskStatus.Listener.class,
-                        TargetState.class,
-                        ClassLoader.class,
-                        ConnectMetrics.class,
-                        RetryWithToleranceOperator.class,
-                        Time.class,
-                        StatusBackingStore.class
-                )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics,
-                        retryWithToleranceOperator, Time.SYSTEM, statusBackingStore)
-                .addMockedMethod("initialize")
-                .addMockedMethod("initializeAndStart")
-                .addMockedMethod("execute")
-                .addMockedMethod("close")
-                .createStrictMock();
-
-        workerTask.initialize(TASK_CONFIG);
-        expectLastCall();
-
-        workerTask.initializeAndStart();
-        expectLastCall();
-
-        workerTask.execute();
-        expectLastCall();
-
-        statusListener.onStartup(taskId);
-        expectLastCall();
-
-        workerTask.close();
-        expectLastCall();
-
-        statusListener.onShutdown(taskId);
-        expectLastCall();
-
-        replay(workerTask);
+        WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, TargetState.STARTED, loader, metrics,
+                retryWithToleranceOperator, Time.SYSTEM, statusBackingStore);
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.run();
         workerTask.stop();
         workerTask.awaitStop(1000L);
 
-        verify(workerTask);
+        verify(statusListener).onStartup(taskId);
+        verify(statusListener).onShutdown(taskId);
     }
 
     @Test
     public void stopBeforeStarting() {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
 
-        WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(
-                        ConnectorTaskId.class,
-                        TaskStatus.Listener.class,
-                        TargetState.class,
-                        ClassLoader.class,
-                        ConnectMetrics.class,
-                        RetryWithToleranceOperator.class,
-                        Time.class,
-                        StatusBackingStore.class
-                )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics,
-                        retryWithToleranceOperator, Time.SYSTEM, statusBackingStore)
-                .addMockedMethod("initialize")
-                .addMockedMethod("execute")
-                .addMockedMethod("close")
-                .createStrictMock();
-
-        workerTask.initialize(TASK_CONFIG);
-        EasyMock.expectLastCall();
+        WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, TargetState.STARTED, loader, metrics,
+                retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) {
 
-        workerTask.close();
-        EasyMock.expectLastCall();
+            @Override
+            public void initializeAndStart() {
+                fail("This method is expected to not be invoked");
+            }
 
-        replay(workerTask);
+            @Override
+            public void execute() {
+                fail("This method is expected to not be invoked");
+            }
+        };
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.stop();
@@ -161,72 +109,44 @@ public class WorkerTaskTest {
 
         // now run should not do anything
         workerTask.run();
-
-        verify(workerTask);
     }
 
     @Test
     public void cancelBeforeStopping() throws Exception {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
-
-        WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(
-                        ConnectorTaskId.class,
-                        TaskStatus.Listener.class,
-                        TargetState.class,
-                        ClassLoader.class,
-                        ConnectMetrics.class,
-                        RetryWithToleranceOperator.class,
-                        Time.class,
-                        StatusBackingStore.class
-                )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics,
-                        retryWithToleranceOperator, Time.SYSTEM, statusBackingStore)
-                .addMockedMethod("initialize")
-                .addMockedMethod("initializeAndStart")
-                .addMockedMethod("execute")
-                .addMockedMethod("close")
-                .createStrictMock();
-
         final CountDownLatch stopped = new CountDownLatch(1);
-        final Thread thread = new Thread(() -> {
-            try {
-                stopped.await();
-            } catch (Exception e) {
-            }
-        });
 
-        workerTask.initialize(TASK_CONFIG);
-        EasyMock.expectLastCall();
-
-        workerTask.initializeAndStart();
-        EasyMock.expectLastCall();
-
-        workerTask.execute();
-        expectLastCall().andAnswer(() -> {
-            thread.start();
-            return null;
-        });
+        WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, TargetState.STARTED, loader, metrics,
+                retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) {
 
-        statusListener.onStartup(taskId);
-        expectLastCall();
-
-        workerTask.close();
-        expectLastCall();
-
-        // there should be no call to onShutdown()
+            @Override
+            public void execute() {
+                try {
+                    stopped.await();
+                } catch (InterruptedException e) {
+                    fail("Unexpected interrupt");
+                }
+            }
 
-        replay(workerTask);
+            // Trigger task shutdown immediately after start. The task will block in its execute() method
+            // until the stopped latch is counted down (i.e. it doesn't actually stop after stop is triggered).
+            @Override
+            public void initializeAndStart() {
+                stop();
+            }
+        };
 
         workerTask.initialize(TASK_CONFIG);
-        workerTask.run();
+        Thread t = new Thread(workerTask);
+        t.start();
 
-        workerTask.stop();
         workerTask.cancel();
         stopped.countDown();
-        thread.join();
+        t.join();
 
-        verify(workerTask);
+        verify(statusListener).onStartup(taskId);
+        // there should be no other status updates, including shutdown
+        verifyNoMoreInteractions(statusListener);
     }
 
     @Test
@@ -235,20 +155,6 @@ public class WorkerTaskTest {
         ConnectMetrics metrics = new MockConnectMetrics();
         TaskMetricsGroup group = new TaskMetricsGroup(taskId, metrics, statusListener);
 
-        statusListener.onStartup(taskId);
-        expectLastCall();
-
-        statusListener.onPause(taskId);
-        expectLastCall();
-
-        statusListener.onResume(taskId);
-        expectLastCall();
-
-        statusListener.onShutdown(taskId);
-        expectLastCall();
-
-        replay(statusListener);
-
         group.onStartup(taskId);
         assertRunningMetric(group);
         group.onPause(taskId);
@@ -258,7 +164,10 @@ public class WorkerTaskTest {
         group.onShutdown(taskId);
         assertStoppedMetric(group);
 
-        verify(statusListener);
+        verify(statusListener).onStartup(taskId);
+        verify(statusListener).onPause(taskId);
+        verify(statusListener).onResume(taskId);
+        verify(statusListener).onShutdown(taskId);
     }
 
     @Test
@@ -269,29 +178,6 @@ public class WorkerTaskTest {
         ConnectException error = new ConnectException("error");
         TaskMetricsGroup group = new TaskMetricsGroup(taskId, metrics, statusListener);
 
-        statusListener.onStartup(taskId);
-        expectLastCall();
-
-        statusListener.onPause(taskId);
-        expectLastCall();
-
-        statusListener.onResume(taskId);
-        expectLastCall();
-
-        statusListener.onPause(taskId);
-        expectLastCall();
-
-        statusListener.onResume(taskId);
-        expectLastCall();
-
-        statusListener.onFailure(taskId, error);
-        expectLastCall();
-
-        statusListener.onShutdown(taskId);
-        expectLastCall();
-
-        replay(statusListener);
-
         time.sleep(1000L);
         group.onStartup(taskId);
         assertRunningMetric(group);
@@ -320,7 +206,11 @@ public class WorkerTaskTest {
         group.onShutdown(taskId);
         assertStoppedMetric(group);
 
-        verify(statusListener);
+        verify(statusListener).onStartup(taskId);
+        verify(statusListener, times(2)).onPause(taskId);
+        verify(statusListener, times(2)).onResume(taskId);
+        verify(statusListener).onFailure(taskId, error);
+        verify(statusListener).onShutdown(taskId);
 
         long totalTime = 27000L;
         double pauseTimeRatio = (double) (3000L + 5000L) / totalTime;
@@ -332,6 +222,31 @@ public class WorkerTaskTest {
     private static abstract class TestSinkTask extends SinkTask {
     }
 
+    private static class TestWorkerTask extends WorkerTask {
+
+        public TestWorkerTask(ConnectorTaskId id, Listener statusListener, TargetState initialState, ClassLoader loader,
+                              ConnectMetrics connectMetrics, RetryWithToleranceOperator retryWithToleranceOperator, Time time,
+                              StatusBackingStore statusBackingStore) {
+            super(id, statusListener, initialState, loader, connectMetrics, retryWithToleranceOperator, time, statusBackingStore);
+        }
+
+        @Override
+        public void initialize(TaskConfig taskConfig) {
+        }
+
+        @Override
+        protected void initializeAndStart() {
+        }
+
+        @Override
+        protected void execute() {
+        }
+
+        @Override
+        protected void close() {
+        }
+    }
+
     protected void assertFailedMetric(TaskMetricsGroup metricsGroup) {
         assertEquals(AbstractStatus.State.FAILED, metricsGroup.state());
     }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
index 11e72c2ef4a..a46dc3d5d2e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
@@ -31,15 +31,12 @@ import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.easymock.EasyMock;
-import org.easymock.Mock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -58,14 +55,17 @@ import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ER
 import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_TOPIC;
 import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_STAGE;
 import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_TASK_ID;
-import static org.easymock.EasyMock.replay;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
 
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore("javax.management.*")
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class ErrorReporterTest {
 
     private static final String TOPIC = "test-topic";
@@ -109,12 +109,10 @@ public class ErrorReporterTest {
 
         ProcessingContext context = processingContext();
 
-        EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andThrow(new RuntimeException());
-        replay(producer);
-
-        // since topic name is empty, this method should be a NOOP.
-        // if it attempts to log to the DLQ via the producer, the send mock will throw a RuntimeException.
+        // since topic name is empty, this method should be a NOOP and producer.send() should
+        // not be called.
         deadLetterQueueReporter.report(context);
+        verifyNoMoreInteractions(producer);
     }
 
     @Test
@@ -124,12 +122,11 @@ public class ErrorReporterTest {
 
         ProcessingContext context = processingContext();
 
-        EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(metadata);
-        replay(producer);
+        when(producer.send(any(), any())).thenReturn(metadata);
 
         deadLetterQueueReporter.report(context);
 
-        PowerMock.verifyAll();
+        verify(producer, times(1)).send(any(), any());
     }
 
     @Test
@@ -139,28 +136,12 @@ public class ErrorReporterTest {
 
         ProcessingContext context = processingContext();
 
-        EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(metadata).times(2);
-        replay(producer);
+        when(producer.send(any(), any())).thenReturn(metadata);
 
         deadLetterQueueReporter.report(context);
         deadLetterQueueReporter.report(context);
 
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testDLQReportAndReturnFuture() {
-        DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(
-            producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID, errorHandlingMetrics);
-
-        ProcessingContext context = processingContext();
-
-        EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(metadata);
-        replay(producer);
-
-        deadLetterQueueReporter.report(context);
-
-        PowerMock.verifyAll();
+        verify(producer, times(2)).send(any(), any());
     }
 
     @Test
@@ -168,13 +149,8 @@ public class ErrorReporterTest {
         DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(
             producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID, errorHandlingMetrics);
 
-        producer.close();
-        EasyMock.expectLastCall();
-        replay(producer);
-
         deadLetterQueueReporter.close();
-
-        PowerMock.verifyAll();
+        verify(producer).close();
     }
 
     @Test
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
index e8b0c7a9553..6abb6221b55 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
@@ -29,16 +29,14 @@ import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.isolation.PluginsTest.TestConverter;
 import org.apache.kafka.connect.runtime.isolation.PluginsTest.TestableWorkerConfig;
+import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.easymock.EasyMock;
-import org.easymock.Mock;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.OngoingStubbing;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -67,44 +65,48 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_TOLERANCE_
 import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_TOLERANCE_DEFAULT;
 import static org.apache.kafka.connect.runtime.errors.ToleranceType.ALL;
 import static org.apache.kafka.connect.runtime.errors.ToleranceType.NONE;
-import static org.easymock.EasyMock.replay;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ProcessingContext.class})
-@PowerMockIgnore("javax.management.*")
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class RetryWithToleranceOperatorTest {
 
+    private static final Map<String, String> PROPERTIES = new HashMap<String, String>() {{
+            put(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG, Objects.toString(2));
+            put(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG, Objects.toString(3000));
+            put(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString());
+
+            // define required properties
+            put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
+            put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
+        }};
+
     public static final RetryWithToleranceOperator NOOP_OPERATOR = new RetryWithToleranceOperator(
-            ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM);
+            ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM,
+            new ErrorHandlingMetrics(
+                    new ConnectorTaskId("noop-connector", -1),
+                    new ConnectMetrics("noop-worker", new TestableWorkerConfig(PROPERTIES),
+                            new SystemTime(), "test-cluster"))
+    );
     public static final RetryWithToleranceOperator ALL_OPERATOR = new RetryWithToleranceOperator(
-            ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM);
-    static {
-        Map<String, String> properties = new HashMap<>();
-        properties.put(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG, Objects.toString(2));
-        properties.put(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG, Objects.toString(3000));
-        properties.put(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString());
-
-        // define required properties
-        properties.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
-        properties.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
-
-        NOOP_OPERATOR.metrics(new ErrorHandlingMetrics(
-            new ConnectorTaskId("noop-connector", -1),
-            new ConnectMetrics("noop-worker", new TestableWorkerConfig(properties), new SystemTime(), "test-cluster"))
-        );
-        ALL_OPERATOR.metrics(new ErrorHandlingMetrics(
-                new ConnectorTaskId("errors-all-tolerate-connector", -1),
-                new ConnectMetrics("errors-all-tolerate-worker", new TestableWorkerConfig(properties),
-                        new SystemTime(), "test-cluster"))
-        );
-    }
+            ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM,
+            new ErrorHandlingMetrics(
+                    new ConnectorTaskId("errors-all-tolerate-connector", -1),
+                    new ConnectMetrics("errors-all-tolerate-worker", new TestableWorkerConfig(PROPERTIES),
+                            new SystemTime(), "test-cluster"))
+    );
 
-    @SuppressWarnings("unused")
     @Mock
     private Operation<String> mockOperation;
 
@@ -120,8 +122,7 @@ public class RetryWithToleranceOperatorTest {
     @Test
     public void testExecuteFailed() {
         RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0,
-            ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM);
-        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+            ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, errorHandlingMetrics);
 
         retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
             SinkTask.class, consumerRecord, new Throwable());
@@ -130,8 +131,7 @@ public class RetryWithToleranceOperatorTest {
     @Test
     public void testExecuteFailedNoTolerance() {
         RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0,
-            ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM);
-        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+            ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM, errorHandlingMetrics);
 
         assertThrows(ConnectException.class, () -> retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
             SinkTask.class, consumerRecord, new Throwable()));
@@ -189,14 +189,15 @@ public class RetryWithToleranceOperatorTest {
 
     private void testHandleExceptionInStage(Stage type, Exception ex) {
         RetryWithToleranceOperator retryWithToleranceOperator = setupExecutor();
-        retryWithToleranceOperator.execute(new ExceptionThrower(ex), type, ExceptionThrower.class);
+        Operation<?> exceptionThrower = () -> {
+            throw ex;
+        };
+        retryWithToleranceOperator.execute(exceptionThrower, type, RetryWithToleranceOperator.class);
         assertTrue(retryWithToleranceOperator.failed());
-        PowerMock.verifyAll();
     }
 
     private RetryWithToleranceOperator setupExecutor() {
-        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM);
-        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, errorHandlingMetrics);
         return retryWithToleranceOperator;
     }
 
@@ -222,25 +223,24 @@ public class RetryWithToleranceOperatorTest {
 
     public void execAndHandleRetriableError(long errorRetryTimeout, int numRetriableExceptionsThrown, List<Long> expectedWaits, Exception e, boolean successExpected) throws Exception {
         MockTime time = new MockTime(0, 0, 0);
-        CountDownLatch exitLatch = PowerMock.createStrictMock(CountDownLatch.class);
-        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(errorRetryTimeout, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, new ProcessingContext(), exitLatch);
-        retryWithToleranceOperator.metrics(errorHandlingMetrics);
-
-        EasyMock.expect(mockOperation.call()).andThrow(e).times(numRetriableExceptionsThrown);
+        CountDownLatch exitLatch = mock(CountDownLatch.class);
+        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(errorRetryTimeout, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, errorHandlingMetrics, new ProcessingContext(), exitLatch);
 
+        OngoingStubbing<String> mockOperationCall = when(mockOperation.call());
+        for (int i = 0; i < numRetriableExceptionsThrown; i++) {
+            mockOperationCall = mockOperationCall.thenThrow(e);
+        }
         if (successExpected) {
-            EasyMock.expect(mockOperation.call()).andReturn("Success");
+            mockOperationCall.thenReturn("Success");
         }
 
         for (Long expectedWait : expectedWaits) {
-            EasyMock.expect(exitLatch.await(expectedWait, TimeUnit.MILLISECONDS)).andAnswer(() -> {
+            when(exitLatch.await(expectedWait, TimeUnit.MILLISECONDS)).thenAnswer(i -> {
                 time.sleep(expectedWait);
                 return false;
             });
         }
 
-        replay(mockOperation, exitLatch);
-
         String result = retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class);
 
         if (successExpected) {
@@ -250,51 +250,47 @@ public class RetryWithToleranceOperatorTest {
             assertTrue(retryWithToleranceOperator.failed());
         }
 
-        EasyMock.verify(mockOperation);
-        PowerMock.verifyAll();
+        verifyNoMoreInteractions(exitLatch);
+        verify(mockOperation, times(successExpected ? numRetriableExceptionsThrown + 1 : numRetriableExceptionsThrown)).call();
     }
 
     @Test
     public void testExecAndHandleNonRetriableError() throws Exception {
         MockTime time = new MockTime(0, 0, 0);
-        CountDownLatch exitLatch = PowerMock.createStrictMock(CountDownLatch.class);
-        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, new ProcessingContext(), exitLatch);
-        retryWithToleranceOperator.metrics(errorHandlingMetrics);
-
-        EasyMock.expect(mockOperation.call()).andThrow(new Exception("Test")).times(1);
-
-        // expect no call to exitLatch.await() which is only called during the retry backoff
+        CountDownLatch exitLatch = mock(CountDownLatch.class);
+        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, errorHandlingMetrics, new ProcessingContext(), exitLatch);
 
-        replay(mockOperation, exitLatch);
+        when(mockOperation.call()).thenThrow(new Exception("Test"));
 
         String result = retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class);
         assertTrue(retryWithToleranceOperator.failed());
         assertNull(result);
 
-        EasyMock.verify(mockOperation);
-        PowerMock.verifyAll();
+        // expect no call to exitLatch.await() which is only called during the retry backoff
+        verify(mockOperation).call();
+        verify(exitLatch, never()).await(anyLong(), any());
     }
 
     @Test
     public void testExitLatch() throws Exception {
         MockTime time = new MockTime(0, 0, 0);
-        CountDownLatch exitLatch = PowerMock.createStrictMock(CountDownLatch.class);
-        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(-1, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, new ProcessingContext(), exitLatch);
-        retryWithToleranceOperator.metrics(errorHandlingMetrics);
-        EasyMock.expect(mockOperation.call()).andThrow(new RetriableException("test")).anyTimes();
-        EasyMock.expect(exitLatch.await(300, TimeUnit.MILLISECONDS)).andAnswer(() -> {
+        CountDownLatch exitLatch = mock(CountDownLatch.class);
+        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(-1, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, errorHandlingMetrics, new ProcessingContext(), exitLatch);
+        when(mockOperation.call()).thenThrow(new RetriableException("test"));
+
+        when(exitLatch.await(300L, TimeUnit.MILLISECONDS)).thenAnswer(i -> {
             time.sleep(300);
             return false;
         });
-        EasyMock.expect(exitLatch.await(600, TimeUnit.MILLISECONDS)).andAnswer(() -> {
+        when(exitLatch.await(600L, TimeUnit.MILLISECONDS)).thenAnswer(i -> {
             time.sleep(600);
             return false;
         });
-        EasyMock.expect(exitLatch.await(1200, TimeUnit.MILLISECONDS)).andAnswer(() -> {
+        when(exitLatch.await(1200L, TimeUnit.MILLISECONDS)).thenAnswer(i -> {
             time.sleep(1200);
             return false;
         });
-        EasyMock.expect(exitLatch.await(2400, TimeUnit.MILLISECONDS)).andAnswer(() -> {
+        when(exitLatch.await(2400L, TimeUnit.MILLISECONDS)).thenAnswer(i -> {
             time.sleep(2400);
             retryWithToleranceOperator.triggerStop();
             return false;
@@ -302,45 +298,40 @@ public class RetryWithToleranceOperatorTest {
 
         // expect no more calls to exitLatch.await() after retryWithToleranceOperator.triggerStop() is called
 
-        exitLatch.countDown();
-        EasyMock.expectLastCall().once();
-
-        replay(mockOperation, exitLatch);
         retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class);
         assertTrue(retryWithToleranceOperator.failed());
         assertEquals(4500L, time.milliseconds());
-        PowerMock.verifyAll();
+        verify(exitLatch).countDown();
+        verifyNoMoreInteractions(exitLatch);
     }
 
     @Test
     public void testBackoffLimit() throws Exception {
         MockTime time = new MockTime(0, 0, 0);
-        CountDownLatch exitLatch = PowerMock.createStrictMock(CountDownLatch.class);
-        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(5, 5000, NONE, time, new ProcessingContext(), exitLatch);
+        CountDownLatch exitLatch = mock(CountDownLatch.class);
+        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(5, 5000, NONE, time, errorHandlingMetrics, new ProcessingContext(), exitLatch);
 
-        EasyMock.expect(exitLatch.await(300, TimeUnit.MILLISECONDS)).andAnswer(() -> {
+        when(exitLatch.await(300, TimeUnit.MILLISECONDS)).thenAnswer(i -> {
             time.sleep(300);
             return false;
         });
-        EasyMock.expect(exitLatch.await(600, TimeUnit.MILLISECONDS)).andAnswer(() -> {
+        when(exitLatch.await(600, TimeUnit.MILLISECONDS)).thenAnswer(i -> {
             time.sleep(600);
             return false;
         });
-        EasyMock.expect(exitLatch.await(1200, TimeUnit.MILLISECONDS)).andAnswer(() -> {
+        when(exitLatch.await(1200, TimeUnit.MILLISECONDS)).thenAnswer(i -> {
             time.sleep(1200);
             return false;
         });
-        EasyMock.expect(exitLatch.await(2400, TimeUnit.MILLISECONDS)).andAnswer(() -> {
+        when(exitLatch.await(2400, TimeUnit.MILLISECONDS)).thenAnswer(i -> {
             time.sleep(2400);
             return false;
         });
-        EasyMock.expect(exitLatch.await(500, TimeUnit.MILLISECONDS)).andAnswer(() -> {
+        when(exitLatch.await(500, TimeUnit.MILLISECONDS)).thenAnswer(i -> {
             time.sleep(500);
             return false;
         });
-        EasyMock.expect(exitLatch.await(0, TimeUnit.MILLISECONDS)).andReturn(false);
-
-        replay(exitLatch);
+        when(exitLatch.await(0, TimeUnit.MILLISECONDS)).thenReturn(false);
 
         retryWithToleranceOperator.backoff(1, 5000);
         retryWithToleranceOperator.backoff(2, 5000);
@@ -353,23 +344,21 @@ public class RetryWithToleranceOperatorTest {
         // that we don't wait with a negative timeout
         retryWithToleranceOperator.backoff(6, 5000);
 
-        PowerMock.verifyAll();
+        verifyNoMoreInteractions(exitLatch);
     }
 
     @Test
     public void testToleranceLimit() {
-        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM);
-        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM, errorHandlingMetrics);
         retryWithToleranceOperator.markAsFailed();
         assertFalse("should not tolerate any errors", retryWithToleranceOperator.withinToleranceLimits());
 
-        retryWithToleranceOperator = new RetryWithToleranceOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM);
-        retryWithToleranceOperator.metrics(errorHandlingMetrics);
+        retryWithToleranceOperator = new RetryWithToleranceOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, errorHandlingMetrics);
         retryWithToleranceOperator.markAsFailed();
         retryWithToleranceOperator.markAsFailed();
         assertTrue("should tolerate all errors", retryWithToleranceOperator.withinToleranceLimits());
 
-        retryWithToleranceOperator = new RetryWithToleranceOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM);
+        retryWithToleranceOperator = new RetryWithToleranceOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM, errorHandlingMetrics);
         assertTrue("no tolerance is within limits if no failures", retryWithToleranceOperator.withinToleranceLimits());
     }
 
@@ -379,14 +368,12 @@ public class RetryWithToleranceOperatorTest {
         assertEquals(configuration.errorRetryTimeout(), ERRORS_RETRY_TIMEOUT_DEFAULT);
         assertEquals(configuration.errorMaxDelayInMillis(), ERRORS_RETRY_MAX_DELAY_DEFAULT);
         assertEquals(configuration.errorToleranceType(), ERRORS_TOLERANCE_DEFAULT);
-
-        PowerMock.verifyAll();
     }
 
     ConnectorConfig config(Map<String, String> connProps) {
         Map<String, String> props = new HashMap<>();
         props.put(ConnectorConfig.NAME_CONFIG, "test");
-        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SinkTask.class.getName());
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SinkConnector.class.getName());
         props.putAll(connProps);
         return new ConnectorConfig(plugins, props);
     }
@@ -402,8 +389,6 @@ public class RetryWithToleranceOperatorTest {
 
         configuration = config(singletonMap(ERRORS_TOLERANCE_CONFIG, "none"));
         assertEquals(configuration.errorToleranceType(), ToleranceType.NONE);
-
-        PowerMock.verifyAll();
     }
 
     @Test
@@ -414,9 +399,9 @@ public class RetryWithToleranceOperatorTest {
         // can't corrupt the state of the ProcessingContext
         AtomicReference<Throwable> failed = new AtomicReference<>(null);
         RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0,
-                ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, new ProcessingContext() {
-                    private AtomicInteger count = new AtomicInteger();
-                    private AtomicInteger attempt = new AtomicInteger();
+                ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, errorHandlingMetrics, new ProcessingContext() {
+                    private final AtomicInteger count = new AtomicInteger();
+                    private final AtomicInteger attempt = new AtomicInteger();
 
                     @Override
                     public void error(Throwable error) {
@@ -451,7 +436,6 @@ public class RetryWithToleranceOperatorTest {
                         super.attempt(attempt);
                     }
                 }, new CountDownLatch(1));
-        retryWithToleranceOperator.metrics(errorHandlingMetrics);
 
         ExecutorService pool = Executors.newFixedThreadPool(numThreads);
         List<? extends Future<?>> futures = IntStream.range(0, numThreads).boxed()
@@ -494,18 +478,4 @@ public class RetryWithToleranceOperatorTest {
             throw exception;
         }
     }
-
-
-    private static class ExceptionThrower implements Operation<Object> {
-        private Exception e;
-
-        public ExceptionThrower(Exception e) {
-            this.e = e;
-        }
-
-        @Override
-        public Object call() throws Exception {
-            throw e;
-        }
-    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporterTest.java
index 2d78297db5d..084bf9c57f4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporterTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporterTest.java
@@ -17,16 +17,18 @@
 
 package org.apache.kafka.connect.runtime.errors;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
-import org.easymock.Mock;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -34,39 +36,27 @@ import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore("javax.management.*")
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class WorkerErrantRecordReporterTest {
 
     private WorkerErrantRecordReporter reporter;
 
-    @Mock
-    private RetryWithToleranceOperator retryWithToleranceOperator;
-
-    @Mock
-    private Converter converter;
-
-    @Mock
-    private HeaderConverter headerConverter;
-
-    @Mock
-    private SinkRecord record;
-
-    @Before
-    public void setup() {
-        reporter = new WorkerErrantRecordReporter(
-            retryWithToleranceOperator,
-            converter,
-            converter,
-            headerConverter
-        );
-    }
+    @Mock private Converter converter;
+    @Mock private HeaderConverter headerConverter;
+    @Mock private InternalSinkRecord record;
+    @Mock private ErrorHandlingMetrics errorHandlingMetrics;
+    @Mock private ErrorReporter errorReporter;
 
     @Test
     public void testGetFutures() {
+        initializeReporter(true);
         Collection<TopicPartition> topicPartitions = new ArrayList<>();
         assertTrue(reporter.futures.isEmpty());
         for (int i = 0; i < 4; i++) {
@@ -78,4 +68,46 @@ public class WorkerErrantRecordReporterTest {
         reporter.awaitFutures(topicPartitions);
         assertTrue(reporter.futures.isEmpty());
     }
+
+    @Test
+    public void testReportErrorsTolerated() {
+        testReport(true);
+    }
+
+    @Test
+    public void testReportNoToleratedErrors() {
+        testReport(false);
+    }
+
+    private void testReport(boolean errorsTolerated) {
+        initializeReporter(errorsTolerated);
+        when(errorReporter.report(any())).thenReturn(CompletableFuture.completedFuture(null));
+        @SuppressWarnings("unchecked") ConsumerRecord<byte[], byte[]> consumerRecord = mock(ConsumerRecord.class);
+        when(record.originalRecord()).thenReturn(consumerRecord);
+
+        if (errorsTolerated) {
+            reporter.report(record, new Throwable());
+        } else {
+            assertThrows(ConnectException.class, () -> reporter.report(record, new Throwable()));
+        }
+
+        verify(errorReporter).report(any());
+    }
+
+    private void initializeReporter(boolean errorsTolerated) {
+        RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(
+                5000,
+                ConnectorConfig.ERRORS_RETRY_MAX_DELAY_DEFAULT,
+                errorsTolerated ? ToleranceType.ALL : ToleranceType.NONE,
+                Time.SYSTEM,
+                errorHandlingMetrics
+        );
+        retryWithToleranceOperator.reporters(Collections.singletonList(errorReporter));
+        reporter = new WorkerErrantRecordReporter(
+                retryWithToleranceOperator,
+                converter,
+                converter,
+                headerConverter
+        );
+    }
 }