You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/08/23 15:49:50 UTC

[kafka] branch trunk updated: KAFKA-13133: Replace EasyMock and PowerMock with Mockito for AbstractHerderTest (#12473)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4b310d1fe17 KAFKA-13133: Replace EasyMock and PowerMock with Mockito for AbstractHerderTest (#12473)
4b310d1fe17 is described below

commit 4b310d1fe171d6d5edda13bb1baf2cf5a0d8eb68
Author: Divij Vaidya <di...@amazon.com>
AuthorDate: Tue Aug 23 17:49:38 2022 +0200

    KAFKA-13133: Replace EasyMock and PowerMock with Mockito for AbstractHerderTest (#12473)
    
    
    Reviewers: Mickael Maison <mi...@gmail.com>, Yash Mayya <ya...@gmail.com>
    
    Co-authored-by: wycccccc <49...@qq.com>
    Co-authored-by: wycccccc <43...@users.noreply.github.com>
---
 build.gradle                                       |   2 +-
 .../kafka/connect/runtime/AbstractHerderTest.java  | 490 ++++++++-------------
 2 files changed, 189 insertions(+), 303 deletions(-)

diff --git a/build.gradle b/build.gradle
index 7c38d899a6e..16840d81b0f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -404,7 +404,7 @@ subprojects {
   if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_16)) {
     testsToExclude.addAll([
       // connect tests
-      "**/AbstractHerderTest.*", "**/ConnectorPluginsResourceTest.*",
+      "**/ConnectorPluginsResourceTest.*",
       "**/ConnectorsResourceTest.*", "**/DistributedHerderTest.*", "**/FileOffsetBakingStoreTest.*",
       "**/ErrorHandlingTaskTest.*", "**/KafkaConfigBackingStoreTest.*", "**/KafkaOffsetBackingStoreTest.*",
       "**/KafkaBasedLogTest.*", "**/OffsetStorageWriterTest.*", "**/StandaloneHerderTest.*",
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index ada507f1eb3..50f3ac00671 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.config.provider.DirectoryConfigProvider;
 import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
 import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
@@ -48,15 +49,13 @@ import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.api.easymock.annotation.Mock;
-import org.powermock.api.easymock.annotation.MockStrict;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -72,20 +71,22 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.connect.runtime.AbstractHerder.keysWithVariableValues;
-import static org.easymock.EasyMock.anyString;
-import static org.easymock.EasyMock.partialMockBuilder;
-import static org.easymock.EasyMock.strictMock;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.withSettings;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
-import static org.powermock.api.easymock.PowerMock.replayAll;
-import static org.powermock.api.easymock.PowerMock.verifyAll;
 
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({AbstractHerder.class})
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class AbstractHerderTest {
 
     private static final String CONN1 = "sourceA";
@@ -138,96 +139,59 @@ public class AbstractHerderTest {
     private final String workerId = "workerId";
     private final String kafkaClusterId = "I4ZmrWqfT2e-upky_4fdPA";
     private final int generation = 5;
-    private final String connector = "connector";
+    private final String connectorName = "connector";
     private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
 
-    @MockStrict private Worker worker;
-    @MockStrict private WorkerConfigTransformer transformer;
-    @MockStrict private ConfigBackingStore configStore;
-    @MockStrict private StatusBackingStore statusStore;
-    @MockStrict private ClassLoader classLoader;
+    @Mock private Worker worker;
+    @Mock private WorkerConfigTransformer transformer;
+    @Mock private ConfigBackingStore configStore;
+    @Mock private StatusBackingStore statusStore;
+    @Mock private ClassLoader classLoader;
     @Mock private Plugins plugins;
 
+    private ClassLoader loader;
+    private Connector connector;
+
+    @Before
+    public void before() {
+        loader = Utils.getContextOrKafkaClassLoader();
+    }
+
+    @After
+    public void tearDown() {
+        if (loader != null) Plugins.compareAndSwapLoaders(loader);
+    }
+
     @Test
     public void testConnectors() {
-        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-            .withConstructor(
-                Worker.class,
-                String.class,
-                String.class,
-                StatusBackingStore.class,
-                ConfigBackingStore.class,
-                ConnectorClientConfigOverridePolicy.class
-            )
-            .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
-            .addMockedMethod("generation")
-            .createMock();
-
-        EasyMock.expect(herder.generation()).andStubReturn(generation);
-        EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
-        EasyMock.expect(configStore.snapshot()).andReturn(SNAPSHOT);
-        replayAll();
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
+
+        when(configStore.snapshot()).thenReturn(SNAPSHOT);
         assertEquals(Collections.singleton(CONN1), new HashSet<>(herder.connectors()));
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testConnectorStatus() {
-        ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
-        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-            .withConstructor(
-                Worker.class,
-                String.class,
-                String.class,
-                StatusBackingStore.class,
-                ConfigBackingStore.class,
-                ConnectorClientConfigOverridePolicy.class
-            )
-            .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
-            .addMockedMethod("generation")
-            .createMock();
-
-        EasyMock.expect(herder.generation()).andStubReturn(generation);
-        EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
-        EasyMock.expect(statusStore.get(connector))
-            .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));
-        EasyMock.expect(statusStore.getAll(connector))
-            .andReturn(Collections.singletonList(
-                new TaskStatus(taskId, AbstractStatus.State.UNASSIGNED, workerId, generation)));
-
-        replayAll();
-        ConnectorStateInfo csi = herder.connectorStatus(connector);
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void connectorStatus() {
-        ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
+        ConnectorTaskId taskId = new ConnectorTaskId(connectorName, 0);
 
-        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
-                                 ConnectorClientConfigOverridePolicy.class)
-                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
-                .addMockedMethod("generation")
-                .createMock();
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
 
-        EasyMock.expect(herder.generation()).andStubReturn(generation);
-        EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
+        when(herder.rawConfig(connectorName)).thenReturn(null);
 
-        EasyMock.expect(statusStore.get(connector))
-                .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));
+        when(statusStore.get(connectorName))
+                .thenReturn(new ConnectorStatus(connectorName, AbstractStatus.State.RUNNING, workerId, generation));
 
-        EasyMock.expect(statusStore.getAll(connector))
-                .andReturn(Collections.singletonList(
+        when(statusStore.getAll(connectorName))
+                .thenReturn(Collections.singletonList(
                         new TaskStatus(taskId, AbstractStatus.State.UNASSIGNED, workerId, generation)));
-        EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
-
-        replayAll();
 
+        ConnectorStateInfo state = herder.connectorStatus(connectorName);
 
-        ConnectorStateInfo state = herder.connectorStatus(connector);
-
-        assertEquals(connector, state.name());
+        assertEquals(connectorName, state.name());
         assertEquals("RUNNING", state.connector().state());
         assertEquals(1, state.tasks().size());
         assertEquals(workerId, state.connector().workerId());
@@ -236,31 +200,21 @@ public class AbstractHerderTest {
         assertEquals(0, taskState.id());
         assertEquals("UNASSIGNED", taskState.state());
         assertEquals(workerId, taskState.workerId());
-
-        PowerMock.verifyAll();
     }
 
     @Test
-    public void taskStatus() {
-        ConnectorTaskId taskId = new ConnectorTaskId("connector", 0);
+    public void testTaskStatus() {
+        ConnectorTaskId taskId = new ConnectorTaskId(connectorName, 0);
         String workerId = "workerId";
 
-        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
-                                 ConnectorClientConfigOverridePolicy.class)
-                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
-                .addMockedMethod("generation")
-                .createMock();
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
 
-        EasyMock.expect(herder.generation()).andStubReturn(5);
+        final ArgumentCaptor<TaskStatus> taskStatusArgumentCaptor = ArgumentCaptor.forClass(TaskStatus.class);
+        doNothing().when(statusStore).putSafe(taskStatusArgumentCaptor.capture());
 
-        final Capture<TaskStatus> statusCapture = EasyMock.newCapture();
-        statusStore.putSafe(EasyMock.capture(statusCapture));
-        EasyMock.expectLastCall();
-
-        EasyMock.expect(statusStore.get(taskId)).andAnswer(statusCapture::getValue);
-
-        replayAll();
+        when(statusStore.get(taskId)).thenAnswer(invocation -> taskStatusArgumentCaptor.getValue());
 
         herder.onFailure(taskId, new RuntimeException());
 
@@ -269,35 +223,26 @@ public class AbstractHerderTest {
         assertEquals("FAILED", taskState.state());
         assertEquals(0, taskState.id());
         assertNotNull(taskState.trace());
-
-        verifyAll();
     }
 
     @Test
     public void testBuildRestartPlanForUnknownConnector() {
         String connectorName = "UnknownConnector";
         RestartRequest restartRequest = new RestartRequest(connectorName, false, true);
-        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
-                        ConnectorClientConfigOverridePolicy.class)
-                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
-                .addMockedMethod("generation")
-                .createMock();
-
-        EasyMock.expect(herder.generation()).andStubReturn(generation);
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
 
-        EasyMock.expect(statusStore.get(connectorName)).andReturn(null);
-        replayAll();
+        when(statusStore.get(connectorName)).thenReturn(null);
 
         Optional<RestartPlan> mayBeRestartPlan = herder.buildRestartPlan(restartRequest);
 
         assertFalse(mayBeRestartPlan.isPresent());
     }
 
-    @Test()
+    @Test
     public void testConfigValidationNullConfig() {
         AbstractHerder herder = createConfigValidationHerder(SampleSourceConnector.class, noneConnectorClientConfigOverridePolicy);
-        replayAll();
 
         Map<String, String> config = new HashMap<>();
         config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSourceConnector.class.getName());
@@ -309,14 +254,11 @@ public class AbstractHerderTest {
 
         assertEquals(1, configInfos.errorCount());
         assertErrorForKey(configInfos, "testKey");
-
-        verifyAll();
     }
 
     @Test
     public void testConfigValidationMultipleNullConfig() {
         AbstractHerder herder = createConfigValidationHerder(SampleSourceConnector.class, noneConnectorClientConfigOverridePolicy);
-        replayAll();
 
         Map<String, String> config = new HashMap<>();
         config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSourceConnector.class.getName());
@@ -330,38 +272,28 @@ public class AbstractHerderTest {
         assertEquals(2, configInfos.errorCount());
         assertErrorForKey(configInfos, "testKey");
         assertErrorForKey(configInfos, "secondTestKey");
-
-        verifyAll();
     }
 
     @Test
     public void testBuildRestartPlanForConnectorAndTasks() {
-        RestartRequest restartRequest = new RestartRequest(connector, false, true);
+        RestartRequest restartRequest = new RestartRequest(connectorName, false, true);
 
-        ConnectorTaskId taskId1 = new ConnectorTaskId(connector, 1);
-        ConnectorTaskId taskId2 = new ConnectorTaskId(connector, 2);
+        ConnectorTaskId taskId1 = new ConnectorTaskId(connectorName, 1);
+        ConnectorTaskId taskId2 = new ConnectorTaskId(connectorName, 2);
         List<TaskStatus> taskStatuses = new ArrayList<>();
         taskStatuses.add(new TaskStatus(taskId1, AbstractStatus.State.RUNNING, workerId, generation));
         taskStatuses.add(new TaskStatus(taskId2, AbstractStatus.State.FAILED, workerId, generation));
 
-        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
-                        ConnectorClientConfigOverridePolicy.class)
-                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
-                .addMockedMethod("generation")
-                .createMock();
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
 
-        EasyMock.expect(herder.generation()).andStubReturn(generation);
-        EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
+        when(herder.rawConfig(connectorName)).thenReturn(null);
 
-        EasyMock.expect(statusStore.get(connector))
-                .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));
+        when(statusStore.get(connectorName))
+                .thenReturn(new ConnectorStatus(connectorName, AbstractStatus.State.RUNNING, workerId, generation));
 
-        EasyMock.expect(statusStore.getAll(connector))
-                .andReturn(taskStatuses);
-        EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
-
-        replayAll();
+        when(statusStore.getAll(connectorName)).thenReturn(taskStatuses);
 
         Optional<RestartPlan> mayBeRestartPlan = herder.buildRestartPlan(restartRequest);
 
@@ -372,38 +304,28 @@ public class AbstractHerderTest {
         assertEquals(2, restartPlan.taskIdsToRestart().size());
         assertTrue(restartPlan.taskIdsToRestart().contains(taskId1));
         assertTrue(restartPlan.taskIdsToRestart().contains(taskId2));
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testBuildRestartPlanForNoRestart() {
-        RestartRequest restartRequest = new RestartRequest(connector, true, false);
+        RestartRequest restartRequest = new RestartRequest(connectorName, true, false);
 
-        ConnectorTaskId taskId1 = new ConnectorTaskId(connector, 1);
-        ConnectorTaskId taskId2 = new ConnectorTaskId(connector, 2);
+        ConnectorTaskId taskId1 = new ConnectorTaskId(connectorName, 1);
+        ConnectorTaskId taskId2 = new ConnectorTaskId(connectorName, 2);
         List<TaskStatus> taskStatuses = new ArrayList<>();
         taskStatuses.add(new TaskStatus(taskId1, AbstractStatus.State.RUNNING, workerId, generation));
         taskStatuses.add(new TaskStatus(taskId2, AbstractStatus.State.FAILED, workerId, generation));
 
-        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
-                        ConnectorClientConfigOverridePolicy.class)
-                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
-                .addMockedMethod("generation")
-                .createMock();
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
 
-        EasyMock.expect(herder.generation()).andStubReturn(generation);
-        EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
+        when(herder.rawConfig(connectorName)).thenReturn(null);
 
-        EasyMock.expect(statusStore.get(connector))
-                .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));
+        when(statusStore.get(connectorName))
+                .thenReturn(new ConnectorStatus(connectorName, AbstractStatus.State.RUNNING, workerId, generation));
 
-        EasyMock.expect(statusStore.getAll(connector))
-                .andReturn(taskStatuses);
-        EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
-
-        replayAll();
+        when(statusStore.getAll(connectorName)).thenReturn(taskStatuses);
 
         Optional<RestartPlan> mayBeRestartPlan = herder.buildRestartPlan(restartRequest);
 
@@ -412,38 +334,31 @@ public class AbstractHerderTest {
         assertFalse(restartPlan.shouldRestartConnector());
         assertFalse(restartPlan.shouldRestartTasks());
         assertTrue(restartPlan.taskIdsToRestart().isEmpty());
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testConfigValidationEmptyConfig() {
         AbstractHerder herder = createConfigValidationHerder(SampleSourceConnector.class, noneConnectorClientConfigOverridePolicy, 0);
-        replayAll();
 
         assertThrows(BadRequestException.class, () -> herder.validateConnectorConfig(Collections.emptyMap(), false));
-
-        verifyAll();
+        verify(transformer).transform(Collections.emptyMap());
     }
 
-    @Test()
+    @Test
     public void testConfigValidationMissingName() {
-        AbstractHerder herder = createConfigValidationHerder(SampleSourceConnector.class, noneConnectorClientConfigOverridePolicy);
-        replayAll();
+        final Class<? extends Connector> connectorClass = SampleSourceConnector.class;
+        AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
 
-        Map<String, String> config = Collections.singletonMap(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSourceConnector.class.getName());
+        Map<String, String> config = Collections.singletonMap(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
         ConfigInfos result = herder.validateConnectorConfig(config, false);
 
         // We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on
         // the config fields for SourceConnectorConfig, but we expect these to change rarely.
-        assertEquals(SampleSourceConnector.class.getName(), result.name());
-        assertEquals(
-                Arrays.asList(
-                        ConnectorConfig.COMMON_GROUP, ConnectorConfig.TRANSFORMS_GROUP,
-                        ConnectorConfig.PREDICATES_GROUP, ConnectorConfig.ERROR_GROUP,
-                        SourceConnectorConfig.TOPIC_CREATION_GROUP, SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
-                        SourceConnectorConfig.OFFSETS_TOPIC_GROUP),
-                result.groups());
+        assertEquals(connectorClass.getName(), result.name());
+        assertEquals(Arrays.asList(ConnectorConfig.COMMON_GROUP, ConnectorConfig.TRANSFORMS_GROUP,
+                ConnectorConfig.PREDICATES_GROUP, ConnectorConfig.ERROR_GROUP,
+                SourceConnectorConfig.TOPIC_CREATION_GROUP, SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
+                SourceConnectorConfig.OFFSETS_TOPIC_GROUP), result.groups());
         assertEquals(2, result.errorCount());
         Map<String, ConfigInfo> infos = result.values().stream()
                 .collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
@@ -457,69 +372,70 @@ public class AbstractHerderTest {
         assertEquals("required", infos.get("required").configValue().name());
         assertEquals(1, infos.get("required").configValue().errors().size());
 
-        verifyAll();
+        verify(plugins).newConnector(connectorClass.getName());
+        verify(plugins).compareAndSwapLoaders(connector);
     }
 
     @Test
     public void testConfigValidationInvalidTopics() {
-        AbstractHerder herder = createConfigValidationHerder(SampleSinkConnector.class, noneConnectorClientConfigOverridePolicy);
-        replayAll();
+        final Class<? extends Connector> connectorClass = SampleSinkConnector.class;
+        AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
 
         Map<String, String> config = new HashMap<>();
-        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSinkConnector.class.getName());
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
         config.put(SinkConnectorConfig.TOPICS_CONFIG, "topic1,topic2");
         config.put(SinkConnectorConfig.TOPICS_REGEX_CONFIG, "topic.*");
 
         assertThrows(ConfigException.class, () -> herder.validateConnectorConfig(config, false));
 
-        verifyAll();
+        verify(plugins).newConnector(connectorClass.getName());
+        verify(plugins).compareAndSwapLoaders(connector);
     }
 
     @Test
     public void testConfigValidationTopicsWithDlq() {
-        AbstractHerder herder = createConfigValidationHerder(SampleSinkConnector.class, noneConnectorClientConfigOverridePolicy);
-        replayAll();
+        final Class<? extends Connector> connectorClass = SampleSinkConnector.class;
+        AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
 
         Map<String, String> config = new HashMap<>();
-        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSinkConnector.class.getName());
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
         config.put(SinkConnectorConfig.TOPICS_CONFIG, "topic1");
         config.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, "topic1");
 
         assertThrows(ConfigException.class, () -> herder.validateConnectorConfig(config, false));
 
-        verifyAll();
+        verify(plugins).newConnector(connectorClass.getName());
+        verify(plugins).compareAndSwapLoaders(connector);
     }
 
     @Test
     public void testConfigValidationTopicsRegexWithDlq() {
-        AbstractHerder herder = createConfigValidationHerder(SampleSinkConnector.class, noneConnectorClientConfigOverridePolicy);
-        replayAll();
+        final Class<? extends Connector> connectorClass = SampleSinkConnector.class;
+        AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
 
         Map<String, String> config = new HashMap<>();
-        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSinkConnector.class.getName());
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
         config.put(SinkConnectorConfig.TOPICS_REGEX_CONFIG, "topic.*");
         config.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, "topic1");
 
         assertThrows(ConfigException.class, () -> herder.validateConnectorConfig(config, false));
 
-        verifyAll();
+        verify(plugins).newConnector(connectorClass.getName());
+        verify(plugins).compareAndSwapLoaders(connector);
     }
 
-    @Test()
+    @Test
     public void testConfigValidationTransformsExtendResults() {
-        AbstractHerder herder = createConfigValidationHerder(SampleSourceConnector.class, noneConnectorClientConfigOverridePolicy);
+        final Class<? extends Connector> connectorClass = SampleSourceConnector.class;
+        AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
 
         // 2 transform aliases defined -> 2 plugin lookups
-        Set<PluginDesc<Transformation<?>>> transformations = new HashSet<>();
-        transformations.add(transformationPluginDesc());
-        EasyMock.expect(plugins.transformations()).andReturn(transformations).times(2);
-
-        replayAll();
+        when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
 
         // Define 2 transformations. One has a class defined and so can get embedded configs, the other is missing
         // class info that should generate an error.
         Map<String, String> config = new HashMap<>();
-        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSourceConnector.class.getName());
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
         config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
         config.put(ConnectorConfig.TRANSFORMS_CONFIG, "xformA,xformB");
         config.put(ConnectorConfig.TRANSFORMS_CONFIG + ".xformA.type", SampleTransformation.class.getName());
@@ -529,7 +445,7 @@ public class AbstractHerderTest {
 
         // We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on
         // the config fields for SourceConnectorConfig, but we expect these to change rarely.
-        assertEquals(SampleSourceConnector.class.getName(), result.name());
+        assertEquals(connectorClass.getName(), result.name());
         // Each transform also gets its own group
         List<String> expectedGroups = Arrays.asList(
                 ConnectorConfig.COMMON_GROUP,
@@ -556,28 +472,23 @@ public class AbstractHerderTest {
         assertEquals("transforms.xformB.type", infos.get("transforms.xformB.type").configValue().name());
         assertFalse(infos.get("transforms.xformB.type").configValue().errors().isEmpty());
 
-        verifyAll();
+        verify(plugins, times(2)).transformations();
+        verify(plugins).newConnector(connectorClass.getName());
+        verify(plugins).compareAndSwapLoaders(connector);
     }
 
-    @Test()
+    @Test
     public void testConfigValidationPredicatesExtendResults() {
-        AbstractHerder herder = createConfigValidationHerder(SampleSourceConnector.class, noneConnectorClientConfigOverridePolicy);
+        final Class<? extends Connector> connectorClass = SampleSourceConnector.class;
+        AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
 
-        // 2 transform aliases defined -> 2 plugin lookups
-        Set<PluginDesc<Transformation<?>>> transformations = new HashSet<>();
-        transformations.add(transformationPluginDesc());
-        EasyMock.expect(plugins.transformations()).andReturn(transformations).times(1);
-
-        Set<PluginDesc<Predicate<?>>> predicates = new HashSet<>();
-        predicates.add(predicatePluginDesc());
-        EasyMock.expect(plugins.predicates()).andReturn(predicates).times(2);
+        when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
+        when(plugins.predicates()).thenReturn(Collections.singleton(predicatePluginDesc()));
 
-        replayAll();
-
-        // Define 2 transformations. One has a class defined and so can get embedded configs, the other is missing
+        // Define 2 predicates. One has a class defined and so can get embedded configs, the other is missing
         // class info that should generate an error.
         Map<String, String> config = new HashMap<>();
-        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSourceConnector.class.getName());
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
         config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
         config.put(ConnectorConfig.TRANSFORMS_CONFIG, "xformA");
         config.put(ConnectorConfig.TRANSFORMS_CONFIG + ".xformA.type", SampleTransformation.class.getName());
@@ -585,12 +496,13 @@ public class AbstractHerderTest {
         config.put(ConnectorConfig.PREDICATES_CONFIG, "predX,predY");
         config.put(ConnectorConfig.PREDICATES_CONFIG + ".predX.type", SamplePredicate.class.getName());
         config.put("required", "value"); // connector required config
+
         ConfigInfos result = herder.validateConnectorConfig(config, false);
-        assertEquals(herder.connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)), ConnectorType.SOURCE);
+        assertEquals(ConnectorType.SOURCE, herder.connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)));
 
         // We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on
         // the config fields for SourceConnectorConfig, but we expect these to change rarely.
-        assertEquals(SampleSourceConnector.class.getName(), result.name());
+        assertEquals(connectorClass.getName(), result.name());
         // Each transform also gets its own group
         List<String> expectedGroups = Arrays.asList(
                 ConnectorConfig.COMMON_GROUP,
@@ -610,27 +522,22 @@ public class AbstractHerderTest {
                 .collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
         assertEquals(28, infos.size());
         // Should get 2 type fields from the transforms, first adds its own config since it has a valid class
-        assertEquals("transforms.xformA.type",
-                infos.get("transforms.xformA.type").configValue().name());
+        assertEquals("transforms.xformA.type", infos.get("transforms.xformA.type").configValue().name());
         assertTrue(infos.get("transforms.xformA.type").configValue().errors().isEmpty());
-        assertEquals("transforms.xformA.subconfig",
-                infos.get("transforms.xformA.subconfig").configValue().name());
-        assertEquals("transforms.xformA.predicate",
-                infos.get("transforms.xformA.predicate").configValue().name());
+        assertEquals("transforms.xformA.subconfig", infos.get("transforms.xformA.subconfig").configValue().name());
+        assertEquals("transforms.xformA.predicate", infos.get("transforms.xformA.predicate").configValue().name());
         assertTrue(infos.get("transforms.xformA.predicate").configValue().errors().isEmpty());
-        assertEquals("transforms.xformA.negate",
-                infos.get("transforms.xformA.negate").configValue().name());
+        assertEquals("transforms.xformA.negate", infos.get("transforms.xformA.negate").configValue().name());
         assertTrue(infos.get("transforms.xformA.negate").configValue().errors().isEmpty());
-        assertEquals("predicates.predX.type",
-                infos.get("predicates.predX.type").configValue().name());
-        assertEquals("predicates.predX.predconfig",
-                infos.get("predicates.predX.predconfig").configValue().name());
-        assertEquals("predicates.predY.type",
-                infos.get("predicates.predY.type").configValue().name());
-        assertFalse(
-                infos.get("predicates.predY.type").configValue().errors().isEmpty());
-
-        verifyAll();
+        assertEquals("predicates.predX.type", infos.get("predicates.predX.type").configValue().name());
+        assertEquals("predicates.predX.predconfig", infos.get("predicates.predX.predconfig").configValue().name());
+        assertEquals("predicates.predY.type", infos.get("predicates.predY.type").configValue().name());
+        assertFalse(infos.get("predicates.predY.type").configValue().errors().isEmpty());
+
+        verify(plugins).transformations();
+        verify(plugins, times(2)).predicates();
+        verify(plugins).newConnector(connectorClass.getName());
+        verify(plugins).compareAndSwapLoaders(connector);
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
@@ -643,13 +550,13 @@ public class AbstractHerderTest {
         return new PluginDesc(SampleTransformation.class, "1.0", classLoader);
     }
 
-    @Test()
+    @Test
     public void testConfigValidationPrincipalOnlyOverride() {
-        AbstractHerder herder = createConfigValidationHerder(SampleSourceConnector.class, new PrincipalConnectorClientConfigOverridePolicy());
-        replayAll();
+        final Class<? extends Connector> connectorClass = SampleSourceConnector.class;
+        AbstractHerder herder = createConfigValidationHerder(connectorClass, new PrincipalConnectorClientConfigOverridePolicy());
 
         Map<String, String> config = new HashMap<>();
-        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSourceConnector.class.getName());
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
         config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
         config.put("required", "value"); // connector required config
         String ackConfigKey = producerOverrideKey(ProducerConfig.ACKS_CONFIG);
@@ -682,16 +589,17 @@ public class AbstractHerderTest {
         assertTrue(result.values().stream().anyMatch(
             configInfo -> saslConfigKey.equals(configInfo.configValue().name()) && configInfo.configValue().errors().isEmpty()));
 
-        verifyAll();
+        verify(plugins).newConnector(connectorClass.getName());
+        verify(plugins).compareAndSwapLoaders(connector);
     }
 
     @Test
     public void testConfigValidationAllOverride() {
-        AbstractHerder herder = createConfigValidationHerder(SampleSourceConnector.class, new AllConnectorClientConfigOverridePolicy());
-        replayAll();
+        final Class<? extends Connector> connectorClass = SampleSourceConnector.class;
+        AbstractHerder herder = createConfigValidationHerder(connectorClass, new AllConnectorClientConfigOverridePolicy());
 
         Map<String, String> config = new HashMap<>();
-        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSourceConnector.class.getName());
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
         config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
         config.put("required", "value"); // connector required config
         // Try to test a variety of configuration types: string, int, long, boolean, list, class
@@ -731,7 +639,9 @@ public class AbstractHerderTest {
             .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
 
         assertEquals(rawOverriddenClientConfigs, validatedOverriddenClientConfigs);
-        verifyAll();
+
+        verify(plugins).newConnector(connectorClass.getName());
+        verify(plugins).compareAndSwapLoaders(connector);
     }
 
     @Test
@@ -918,21 +828,13 @@ public class AbstractHerderTest {
 
     @Test
     public void testConnectorPluginConfig() throws Exception {
-        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(
-                        Worker.class,
-                        String.class,
-                        String.class,
-                        StatusBackingStore.class,
-                        ConfigBackingStore.class,
-                        ConnectorClientConfigOverridePolicy.class
-                )
-                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
-                .addMockedMethod("generation")
-                .createMock();
-
-        EasyMock.expect(plugins.newPlugin(EasyMock.anyString())).andAnswer(() -> {
-            String name = (String) EasyMock.getCurrentArguments()[0];
+
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
+
+        when(plugins.newPlugin(anyString())).then(invocation -> {
+            String name = invocation.getArgument(0);
             switch (name) {
                 case "sink": return new SampleSinkConnector();
                 case "source": return new SampleSourceConnector();
@@ -941,9 +843,8 @@ public class AbstractHerderTest {
                 case "predicate": return new SamplePredicate();
                 default: return new SampleTransformation<>();
             }
-        }).anyTimes();
-        EasyMock.expect(herder.plugins()).andStubReturn(plugins);
-        replayAll();
+        });
+        when(herder.plugins()).thenReturn(plugins);
 
         List<ConfigKeyInfo> sinkConnectorConfigs = herder.connectorPluginConfig("sink");
         assertNotNull(sinkConnectorConfigs);
@@ -973,30 +874,22 @@ public class AbstractHerderTest {
     @Test(expected = NotFoundException.class)
     public void testGetConnectorConfigDefWithBadName() throws Exception {
         String connName = "AnotherPlugin";
-        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
-                        ConnectorClientConfigOverridePolicy.class)
-                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
-                .addMockedMethod("generation")
-                .createMock();
-        EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
-        EasyMock.expect(plugins.newPlugin(anyString())).andThrow(new ClassNotFoundException());
-        replayAll();
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
+        when(worker.getPlugins()).thenReturn(plugins);
+        when(plugins.newPlugin(anyString())).thenThrow(new ClassNotFoundException());
         herder.connectorPluginConfig(connName);
     }
 
     @Test(expected = BadRequestException.class)
     public void testGetConnectorConfigDefWithInvalidPluginType() throws Exception {
         String connName = "AnotherPlugin";
-        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
-                        ConnectorClientConfigOverridePolicy.class)
-                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
-                .addMockedMethod("generation")
-                .createMock();
-        EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
-        EasyMock.expect(plugins.newPlugin(anyString())).andReturn(new DirectoryConfigProvider());
-        replayAll();
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
+        when(worker.getPlugins()).thenReturn(plugins);
+        when(plugins.newPlugin(anyString())).thenReturn(new DirectoryConfigProvider());
         herder.connectorPluginConfig(connName);
     }
 
@@ -1055,23 +948,16 @@ public class AbstractHerderTest {
                                                         ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
                                                         int countOfCallingNewConnector) {
 
-
-        ConfigBackingStore configStore = strictMock(ConfigBackingStore.class);
-        StatusBackingStore statusStore = strictMock(StatusBackingStore.class);
-
-        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
-                                 ConnectorClientConfigOverridePolicy.class)
-                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, connectorClientConfigOverridePolicy)
-                .addMockedMethod("generation")
-                .createMock();
-        EasyMock.expect(herder.generation()).andStubReturn(generation);
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, connectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
 
         // Call to validateConnectorConfig
-        EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = EasyMock.newCapture();
-        EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
-        EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
+        when(worker.configTransformer()).thenReturn(transformer);
+        @SuppressWarnings("unchecked")
+        final ArgumentCaptor<Map<String, String>> mapArgumentCaptor = ArgumentCaptor.forClass(Map.class);
+        when(transformer.transform(mapArgumentCaptor.capture())).thenAnswer(invocation -> mapArgumentCaptor.getValue());
+        when(worker.getPlugins()).thenReturn(plugins);
         final Connector connector;
         try {
             connector = connectorClass.getConstructor().newInstance();
@@ -1079,18 +965,18 @@ public class AbstractHerderTest {
             throw new RuntimeException("Couldn't create connector", e);
         }
         if (countOfCallingNewConnector > 0) {
-            EasyMock.expect(plugins.newConnector(connectorClass.getName())).andReturn(connector).times(countOfCallingNewConnector);
-            EasyMock.expect(plugins.compareAndSwapLoaders(connector)).andReturn(classLoader).times(countOfCallingNewConnector);
+            when(plugins.newConnector(connectorClass.getName())).thenReturn(connector);
+            when(plugins.compareAndSwapLoaders(connector)).thenReturn(classLoader);
         }
-
+        this.connector = connector;
         return herder;
     }
 
     // We need to use a real class here due to some issue with mocking java.lang.Class
-    private abstract class BogusSourceConnector extends SourceConnector {
+    private abstract static class BogusSourceConnector extends SourceConnector {
     }
 
-    private abstract class BogusSourceTask extends SourceTask {
+    private abstract static class BogusSourceTask extends SourceTask {
     }
 
     private static String producerOverrideKey(String config) {