You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/10/29 14:08:16 UTC

[GitHub] [kafka] mimaison commented on a change in pull request #11137: KAFKA-13133 Replace EasyMock and PowerMock with Mockito for AbstractHerderTest

mimaison commented on a change in pull request #11137:
URL: https://github.com/apache/kafka/pull/11137#discussion_r739243762



##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
##########
@@ -135,90 +133,75 @@
     private final int generation = 5;
     private final String connector = "connector";
     private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
+    private Connector insConnector;
 
-    @MockStrict private Worker worker;
-    @MockStrict private WorkerConfigTransformer transformer;
-    @MockStrict private Plugins plugins;
-    @MockStrict private ClassLoader classLoader;
-    @MockStrict private ConfigBackingStore configStore;
-    @MockStrict private StatusBackingStore statusStore;
+    private final Worker worker = mock(Worker.class);
+    private final WorkerConfigTransformer transformer = mock(WorkerConfigTransformer.class);
+    private final Plugins plugins = mock(Plugins.class);
+    private final ClassLoader classLoader = mock(ClassLoader.class);
+    private final ConfigBackingStore configStore = mock(ConfigBackingStore.class);
+    private final StatusBackingStore statusStore = mock(StatusBackingStore.class);
+    private ClassLoader loader;
+
+    @Before
+    public void before() {
+        loader = Utils.getContextOrKafkaClassLoader();
+    }
+
+    @After
+    public void tearDown() {
+        if (loader != null) Plugins.compareAndSwapLoaders(loader);
+    }
 
     @Test
     public void testConnectors() {
-        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-            .withConstructor(
-                Worker.class,
-                String.class,
-                String.class,
-                StatusBackingStore.class,
-                ConfigBackingStore.class,
-                ConnectorClientConfigOverridePolicy.class
-            )
-            .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
-            .addMockedMethod("generation")
-            .createMock();
-
-        EasyMock.expect(herder.generation()).andStubReturn(generation);
-        EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
-        EasyMock.expect(configStore.snapshot()).andReturn(SNAPSHOT);
-        replayAll();
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
+
+        when(herder.generation()).thenReturn(generation);
+        when(herder.rawConfig(connector)).thenReturn(null);
+        when(configStore.snapshot()).thenReturn(SNAPSHOT);
         assertEquals(Collections.singleton(CONN1), new HashSet<>(herder.connectors()));
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testConnectorStatus() {
         ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
-        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-            .withConstructor(
-                Worker.class,
-                String.class,
-                String.class,
-                StatusBackingStore.class,
-                ConfigBackingStore.class,
-                ConnectorClientConfigOverridePolicy.class
-            )
-            .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
-            .addMockedMethod("generation")
-            .createMock();
-
-        EasyMock.expect(herder.generation()).andStubReturn(generation);
-        EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
-        EasyMock.expect(statusStore.get(connector))
-            .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));
-        EasyMock.expect(statusStore.getAll(connector))
-            .andReturn(Collections.singletonList(
+
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
+
+        when(herder.generation()).thenReturn(generation);
+        when(herder.rawConfig(connector)).thenReturn(null);
+        when(statusStore.get(connector))
+            .thenReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));
+        when(statusStore.getAll(connector))
+            .thenReturn(Collections.singletonList(
                 new TaskStatus(taskId, AbstractStatus.State.UNASSIGNED, workerId, generation)));
 
-        replayAll();
         ConnectorStateInfo csi = herder.connectorStatus(connector);

Review comment:
       Can we assert anything on `csi`? Otherwise, this test does not assert anything.
   
   Do we actually need this test? It seems the test below is basically the same but with asserts?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
##########
@@ -299,24 +268,20 @@ public void testBuildRestartPlanForConnectorAndTasks() {
         taskStatuses.add(new TaskStatus(taskId1, AbstractStatus.State.RUNNING, workerId, generation));
         taskStatuses.add(new TaskStatus(taskId2, AbstractStatus.State.FAILED, workerId, generation));
 
-        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
-                        ConnectorClientConfigOverridePolicy.class)
-                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
-                .addMockedMethod("generation")
-                .createMock();
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
 
-        EasyMock.expect(herder.generation()).andStubReturn(generation);
-        EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
+        when(herder.generation()).thenReturn(generation);
+        when(herder.rawConfig(connector)).thenReturn(null);
 
-        EasyMock.expect(statusStore.get(connector))
-                .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));
+        when(statusStore.get(connector))
+                .thenReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));
 
-        EasyMock.expect(statusStore.getAll(connector))
-                .andReturn(taskStatuses);
-        EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
+        when(statusStore.getAll(connector))
+                .thenReturn(taskStatuses);
+        when(worker.getPlugins()).thenReturn(plugins);

Review comment:
       I don't think this mock is needed. Same below.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
##########
@@ -407,70 +362,73 @@ public void testConfigValidationMissingName() throws Throwable {
         assertEquals("required", infos.get("required").configValue().name());
         assertEquals(1, infos.get("required").configValue().errors().size());
 
-        verifyAll();
+        verify(plugins).newConnector(connectorClass.getName());
+        verify(plugins).compareAndSwapLoaders(insConnector);
     }
 
     @Test
     public void testConfigValidationInvalidTopics() {
-        AbstractHerder herder = createConfigValidationHerder(TestSinkConnector.class, noneConnectorClientConfigOverridePolicy);
-        replayAll();
+        final Class<? extends Connector> connectorClass = TestSinkConnector.class;
+        AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
 
         Map<String, String> config = new HashMap<>();
-        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSinkConnector.class.getName());
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
         config.put(SinkConnectorConfig.TOPICS_CONFIG, "topic1,topic2");
         config.put(SinkConnectorConfig.TOPICS_REGEX_CONFIG, "topic.*");
 
         assertThrows(ConfigException.class, () -> herder.validateConnectorConfig(config, false));
 
-        verifyAll();
+        verify(plugins).newConnector(connectorClass.getName());
+        verify(plugins).compareAndSwapLoaders(insConnector);
     }
 
     @Test
     public void testConfigValidationTopicsWithDlq() {
+        final Class<? extends Connector> connectorClass = TestSinkConnector.class;
         AbstractHerder herder = createConfigValidationHerder(TestSinkConnector.class, noneConnectorClientConfigOverridePolicy);
-        replayAll();
 
         Map<String, String> config = new HashMap<>();
-        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSinkConnector.class.getName());
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
         config.put(SinkConnectorConfig.TOPICS_CONFIG, "topic1");
         config.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, "topic1");
 
         assertThrows(ConfigException.class, () -> herder.validateConnectorConfig(config, false));
 
-        verifyAll();
+        verify(plugins).newConnector(connectorClass.getName());
+        verify(plugins).compareAndSwapLoaders(insConnector);
     }
 
     @Test
     public void testConfigValidationTopicsRegexWithDlq() {
-        AbstractHerder herder = createConfigValidationHerder(TestSinkConnector.class, noneConnectorClientConfigOverridePolicy);
-        replayAll();
+        final Class<? extends Connector> connectorClass = TestSinkConnector.class;
+        AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
 
         Map<String, String> config = new HashMap<>();
-        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSinkConnector.class.getName());
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
         config.put(SinkConnectorConfig.TOPICS_REGEX_CONFIG, "topic.*");
         config.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, "topic1");
 
         assertThrows(ConfigException.class, () -> herder.validateConnectorConfig(config, false));
 
-        verifyAll();
+        verify(plugins).newConnector(connectorClass.getName());
+        verify(plugins).compareAndSwapLoaders(insConnector);
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     @Test()
-    public void testConfigValidationTransformsExtendResults() throws Throwable {
-        AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, noneConnectorClientConfigOverridePolicy);
+    public void testConfigValidationTransformsExtendResults() {
+        final Class<? extends Connector> connectorClass = TestSourceConnector.class;

Review comment:
       We have this variable in many tests, should instead have a field in this class?

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
##########
@@ -135,90 +133,75 @@
     private final int generation = 5;
     private final String connector = "connector";
     private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
+    private Connector insConnector;
 
-    @MockStrict private Worker worker;
-    @MockStrict private WorkerConfigTransformer transformer;
-    @MockStrict private Plugins plugins;
-    @MockStrict private ClassLoader classLoader;
-    @MockStrict private ConfigBackingStore configStore;
-    @MockStrict private StatusBackingStore statusStore;
+    private final Worker worker = mock(Worker.class);
+    private final WorkerConfigTransformer transformer = mock(WorkerConfigTransformer.class);
+    private final Plugins plugins = mock(Plugins.class);
+    private final ClassLoader classLoader = mock(ClassLoader.class);
+    private final ConfigBackingStore configStore = mock(ConfigBackingStore.class);
+    private final StatusBackingStore statusStore = mock(StatusBackingStore.class);
+    private ClassLoader loader;
+
+    @Before
+    public void before() {
+        loader = Utils.getContextOrKafkaClassLoader();
+    }
+
+    @After
+    public void tearDown() {
+        if (loader != null) Plugins.compareAndSwapLoaders(loader);
+    }
 
     @Test
     public void testConnectors() {
-        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-            .withConstructor(
-                Worker.class,
-                String.class,
-                String.class,
-                StatusBackingStore.class,
-                ConfigBackingStore.class,
-                ConnectorClientConfigOverridePolicy.class
-            )
-            .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
-            .addMockedMethod("generation")
-            .createMock();
-
-        EasyMock.expect(herder.generation()).andStubReturn(generation);
-        EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
-        EasyMock.expect(configStore.snapshot()).andReturn(SNAPSHOT);
-        replayAll();
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
+
+        when(herder.generation()).thenReturn(generation);
+        when(herder.rawConfig(connector)).thenReturn(null);
+        when(configStore.snapshot()).thenReturn(SNAPSHOT);
         assertEquals(Collections.singleton(CONN1), new HashSet<>(herder.connectors()));
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testConnectorStatus() {
         ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
-        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-            .withConstructor(
-                Worker.class,
-                String.class,
-                String.class,
-                StatusBackingStore.class,
-                ConfigBackingStore.class,
-                ConnectorClientConfigOverridePolicy.class
-            )
-            .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
-            .addMockedMethod("generation")
-            .createMock();
-
-        EasyMock.expect(herder.generation()).andStubReturn(generation);
-        EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
-        EasyMock.expect(statusStore.get(connector))
-            .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));
-        EasyMock.expect(statusStore.getAll(connector))
-            .andReturn(Collections.singletonList(
+
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
+
+        when(herder.generation()).thenReturn(generation);
+        when(herder.rawConfig(connector)).thenReturn(null);
+        when(statusStore.get(connector))
+            .thenReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));
+        when(statusStore.getAll(connector))
+            .thenReturn(Collections.singletonList(
                 new TaskStatus(taskId, AbstractStatus.State.UNASSIGNED, workerId, generation)));
 
-        replayAll();
         ConnectorStateInfo csi = herder.connectorStatus(connector);
-        PowerMock.verifyAll();
     }
 
     @Test
     public void connectorStatus() {
         ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
 
-        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-                .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
-                                 ConnectorClientConfigOverridePolicy.class)
-                .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
-                .addMockedMethod("generation")
-                .createMock();
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
 
-        EasyMock.expect(herder.generation()).andStubReturn(generation);
-        EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
+        when(herder.generation()).thenReturn(generation);

Review comment:
       Same, I don't think it's needed in this test. I'm not going to comment on the other tests but let's remove the mocks we don't need.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
##########
@@ -367,31 +325,28 @@ public void testBuildRestartPlanForNoRestart() {
         assertFalse(restartPlan.shouldRestartConnector());
         assertFalse(restartPlan.shouldRestartTasks());
         assertTrue(restartPlan.taskIdsToRestart().isEmpty());
-
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testConfigValidationEmptyConfig() {
         AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, noneConnectorClientConfigOverridePolicy, 0);
-        replayAll();
 
         assertThrows(BadRequestException.class, () -> herder.validateConnectorConfig(Collections.emptyMap(), false));
+        verify(worker, times(2)).configTransformer();
 
-        verifyAll();
     }
 
     @Test()

Review comment:
       We can drop the parenthesis here. Same below

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
##########
@@ -135,90 +133,75 @@
     private final int generation = 5;
     private final String connector = "connector";
     private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
+    private Connector insConnector;
 
-    @MockStrict private Worker worker;
-    @MockStrict private WorkerConfigTransformer transformer;
-    @MockStrict private Plugins plugins;
-    @MockStrict private ClassLoader classLoader;
-    @MockStrict private ConfigBackingStore configStore;
-    @MockStrict private StatusBackingStore statusStore;
+    private final Worker worker = mock(Worker.class);
+    private final WorkerConfigTransformer transformer = mock(WorkerConfigTransformer.class);
+    private final Plugins plugins = mock(Plugins.class);
+    private final ClassLoader classLoader = mock(ClassLoader.class);
+    private final ConfigBackingStore configStore = mock(ConfigBackingStore.class);
+    private final StatusBackingStore statusStore = mock(StatusBackingStore.class);
+    private ClassLoader loader;
+
+    @Before
+    public void before() {
+        loader = Utils.getContextOrKafkaClassLoader();
+    }
+
+    @After
+    public void tearDown() {
+        if (loader != null) Plugins.compareAndSwapLoaders(loader);
+    }
 
     @Test
     public void testConnectors() {
-        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-            .withConstructor(
-                Worker.class,
-                String.class,
-                String.class,
-                StatusBackingStore.class,
-                ConfigBackingStore.class,
-                ConnectorClientConfigOverridePolicy.class
-            )
-            .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
-            .addMockedMethod("generation")
-            .createMock();
-
-        EasyMock.expect(herder.generation()).andStubReturn(generation);
-        EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
-        EasyMock.expect(configStore.snapshot()).andReturn(SNAPSHOT);
-        replayAll();
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
+
+        when(herder.generation()).thenReturn(generation);
+        when(herder.rawConfig(connector)).thenReturn(null);

Review comment:
       Do we actually have to mock `generation()` and `rawConfig()` for this test? Looking at `connector()`, it looks like it only relies on the snapshot.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
##########
@@ -135,90 +133,75 @@
     private final int generation = 5;
     private final String connector = "connector";
     private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
+    private Connector insConnector;
 
-    @MockStrict private Worker worker;
-    @MockStrict private WorkerConfigTransformer transformer;
-    @MockStrict private Plugins plugins;
-    @MockStrict private ClassLoader classLoader;
-    @MockStrict private ConfigBackingStore configStore;
-    @MockStrict private StatusBackingStore statusStore;
+    private final Worker worker = mock(Worker.class);
+    private final WorkerConfigTransformer transformer = mock(WorkerConfigTransformer.class);
+    private final Plugins plugins = mock(Plugins.class);
+    private final ClassLoader classLoader = mock(ClassLoader.class);
+    private final ConfigBackingStore configStore = mock(ConfigBackingStore.class);
+    private final StatusBackingStore statusStore = mock(StatusBackingStore.class);
+    private ClassLoader loader;
+
+    @Before
+    public void before() {
+        loader = Utils.getContextOrKafkaClassLoader();
+    }
+
+    @After
+    public void tearDown() {
+        if (loader != null) Plugins.compareAndSwapLoaders(loader);
+    }
 
     @Test
     public void testConnectors() {
-        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-            .withConstructor(
-                Worker.class,
-                String.class,
-                String.class,
-                StatusBackingStore.class,
-                ConfigBackingStore.class,
-                ConnectorClientConfigOverridePolicy.class
-            )
-            .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
-            .addMockedMethod("generation")
-            .createMock();
-
-        EasyMock.expect(herder.generation()).andStubReturn(generation);
-        EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
-        EasyMock.expect(configStore.snapshot()).andReturn(SNAPSHOT);
-        replayAll();
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
+
+        when(herder.generation()).thenReturn(generation);
+        when(herder.rawConfig(connector)).thenReturn(null);
+        when(configStore.snapshot()).thenReturn(SNAPSHOT);
         assertEquals(Collections.singleton(CONN1), new HashSet<>(herder.connectors()));
-        PowerMock.verifyAll();
     }
 
     @Test
     public void testConnectorStatus() {
         ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
-        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
-            .withConstructor(
-                Worker.class,
-                String.class,
-                String.class,
-                StatusBackingStore.class,
-                ConfigBackingStore.class,
-                ConnectorClientConfigOverridePolicy.class
-            )
-            .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
-            .addMockedMethod("generation")
-            .createMock();
-
-        EasyMock.expect(herder.generation()).andStubReturn(generation);
-        EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
-        EasyMock.expect(statusStore.get(connector))
-            .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));
-        EasyMock.expect(statusStore.getAll(connector))
-            .andReturn(Collections.singletonList(
+
+        AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+                .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+                .defaultAnswer(CALLS_REAL_METHODS));
+
+        when(herder.generation()).thenReturn(generation);

Review comment:
       I don't think we need to mock `generation()` in this test.

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
##########
@@ -407,70 +362,73 @@ public void testConfigValidationMissingName() throws Throwable {
         assertEquals("required", infos.get("required").configValue().name());
         assertEquals(1, infos.get("required").configValue().errors().size());
 
-        verifyAll();
+        verify(plugins).newConnector(connectorClass.getName());
+        verify(plugins).compareAndSwapLoaders(insConnector);
     }
 
     @Test
     public void testConfigValidationInvalidTopics() {
-        AbstractHerder herder = createConfigValidationHerder(TestSinkConnector.class, noneConnectorClientConfigOverridePolicy);
-        replayAll();
+        final Class<? extends Connector> connectorClass = TestSinkConnector.class;
+        AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
 
         Map<String, String> config = new HashMap<>();
-        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSinkConnector.class.getName());
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
         config.put(SinkConnectorConfig.TOPICS_CONFIG, "topic1,topic2");
         config.put(SinkConnectorConfig.TOPICS_REGEX_CONFIG, "topic.*");
 
         assertThrows(ConfigException.class, () -> herder.validateConnectorConfig(config, false));
 
-        verifyAll();
+        verify(plugins).newConnector(connectorClass.getName());
+        verify(plugins).compareAndSwapLoaders(insConnector);
     }
 
     @Test
     public void testConfigValidationTopicsWithDlq() {
+        final Class<? extends Connector> connectorClass = TestSinkConnector.class;
         AbstractHerder herder = createConfigValidationHerder(TestSinkConnector.class, noneConnectorClientConfigOverridePolicy);
-        replayAll();
 
         Map<String, String> config = new HashMap<>();
-        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSinkConnector.class.getName());
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
         config.put(SinkConnectorConfig.TOPICS_CONFIG, "topic1");
         config.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, "topic1");
 
         assertThrows(ConfigException.class, () -> herder.validateConnectorConfig(config, false));
 
-        verifyAll();
+        verify(plugins).newConnector(connectorClass.getName());
+        verify(plugins).compareAndSwapLoaders(insConnector);
     }
 
     @Test
     public void testConfigValidationTopicsRegexWithDlq() {
-        AbstractHerder herder = createConfigValidationHerder(TestSinkConnector.class, noneConnectorClientConfigOverridePolicy);
-        replayAll();
+        final Class<? extends Connector> connectorClass = TestSinkConnector.class;
+        AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
 
         Map<String, String> config = new HashMap<>();
-        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestSinkConnector.class.getName());
+        config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
         config.put(SinkConnectorConfig.TOPICS_REGEX_CONFIG, "topic.*");
         config.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, "topic1");
 
         assertThrows(ConfigException.class, () -> herder.validateConnectorConfig(config, false));
 
-        verifyAll();
+        verify(plugins).newConnector(connectorClass.getName());
+        verify(plugins).compareAndSwapLoaders(insConnector);
     }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     @Test()
-    public void testConfigValidationTransformsExtendResults() throws Throwable {
-        AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, noneConnectorClientConfigOverridePolicy);
+    public void testConfigValidationTransformsExtendResults() {
+        final Class<? extends Connector> connectorClass = TestSourceConnector.class;
+        AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
 
         // 2 transform aliases defined -> 2 plugin lookups
         Set<PluginDesc<Transformation<?>>> transformations = new HashSet<>();
         transformations.add(transformationPluginDesc());
-        EasyMock.expect(plugins.transformations()).andReturn(transformations).times(2);
-
-        replayAll();
+        when(plugins.transformations()).thenReturn(transformations);

Review comment:
       What about inlining `transformations` and having something like:
   ```
   when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
   ```

##########
File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
##########
@@ -505,28 +463,29 @@ public void testConfigValidationTransformsExtendResults() throws Throwable {
         assertEquals("transforms.xformB.type", infos.get("transforms.xformB.type").configValue().name());
         assertFalse(infos.get("transforms.xformB.type").configValue().errors().isEmpty());
 
-        verifyAll();
+        verify(plugins, times(2)).transformations();
+        verify(plugins).newConnector(connectorClass.getName());
+        verify(plugins).compareAndSwapLoaders(insConnector);
     }
 
     @Test()
     public void testConfigValidationPredicatesExtendResults() {
-        AbstractHerder herder = createConfigValidationHerder(TestSourceConnector.class, noneConnectorClientConfigOverridePolicy);
+        final Class<? extends Connector> connectorClass = TestSourceConnector.class;
+        AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
 
         // 2 transform aliases defined -> 2 plugin lookups
         Set<PluginDesc<Transformation<?>>> transformations = new HashSet<>();
         transformations.add(transformationPluginDesc());
-        EasyMock.expect(plugins.transformations()).andReturn(transformations).times(1);
+        when(plugins.transformations()).thenReturn(transformations);

Review comment:
       Same as mentioned above, because these `Set` instances only have 1 element, we can maybe inline these variables. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org