You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/08/22 18:17:50 UTC

[kafka] branch trunk updated: KAFKA-14133: Replace EasyMock with Mockito in WorkerCoordinatorTest and RootResourceTest (#12509)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ddb7fdd88f KAFKA-14133: Replace EasyMock with Mockito in WorkerCoordinatorTest and RootResourceTest (#12509)
ddb7fdd88f is described below

commit ddb7fdd88fbb559d858866f8674a670273405f72
Author: Yash Mayya <ya...@gmail.com>
AuthorDate: Mon Aug 22 23:47:34 2022 +0530

    KAFKA-14133: Replace EasyMock with Mockito in WorkerCoordinatorTest and RootResourceTest (#12509)
    
    
    Reviewers: Mickael Maison <mi...@gmail.com>, Dalibor Plavcic <da...@proton.me>
---
 .../runtime/distributed/WorkerCoordinator.java     |   4 +-
 .../runtime/distributed/WorkerCoordinatorTest.java | 171 ++++++++++-----------
 .../runtime/rest/resources/RootResourceTest.java   |  21 ++-
 .../util/ByteArrayProducerRecordEquals.java        |  53 -------
 4 files changed, 89 insertions(+), 160 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index ced67427a3..851a1bb836 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -373,10 +373,10 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
 
             metrics.addMetric(metrics.metricName("assigned-connectors",
                               this.metricGrpName,
-                              "The number of connector instances currently assigned to this consumer"), numConnectors);
+                              "The number of connector instances currently assigned to this worker"), numConnectors);
             metrics.addMetric(metrics.metricName("assigned-tasks",
                               this.metricGrpName,
-                              "The number of tasks currently assigned to this consumer"), numTasks);
+                              "The number of tasks currently assigned to this worker"), numTasks);
         }
     }
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index c3715aa302..c1d34c87b7 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -39,14 +39,12 @@ import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.storage.ClusterConfigState;
 import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
 import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.easymock.EasyMock;
-import org.easymock.Mock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.powermock.api.easymock.PowerMock;
+import org.mockito.Mock;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -67,6 +65,10 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertThrows;
 import static org.junit.runners.Parameterized.Parameter;
 import static org.junit.runners.Parameterized.Parameters;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 @RunWith(value = Parameterized.class)
 public class WorkerCoordinatorTest {
@@ -74,19 +76,19 @@ public class WorkerCoordinatorTest {
     private static final String LEADER_URL = "leaderUrl:8083";
     private static final String MEMBER_URL = "memberUrl:8083";
 
-    private String connectorId1 = "connector1";
-    private String connectorId2 = "connector2";
-    private String connectorId3 = "connector3";
-    private ConnectorTaskId taskId1x0 = new ConnectorTaskId(connectorId1, 0);
-    private ConnectorTaskId taskId1x1 = new ConnectorTaskId(connectorId1, 1);
-    private ConnectorTaskId taskId2x0 = new ConnectorTaskId(connectorId2, 0);
-    private ConnectorTaskId taskId3x0 = new ConnectorTaskId(connectorId3, 0);
-
-    private String groupId = "test-group";
-    private int sessionTimeoutMs = 10;
-    private int rebalanceTimeoutMs = 60;
-    private int heartbeatIntervalMs = 2;
-    private long retryBackoffMs = 100;
+    private final String connectorId1 = "connector1";
+    private final String connectorId2 = "connector2";
+    private final String connectorId3 = "connector3";
+    private final ConnectorTaskId taskId1x0 = new ConnectorTaskId(connectorId1, 0);
+    private final ConnectorTaskId taskId1x1 = new ConnectorTaskId(connectorId1, 1);
+    private final ConnectorTaskId taskId2x0 = new ConnectorTaskId(connectorId2, 0);
+    private final ConnectorTaskId taskId3x0 = new ConnectorTaskId(connectorId3, 0);
+
+    private final String groupId = "test-group";
+    private final int sessionTimeoutMs = 10;
+    private final int rebalanceTimeoutMs = 60;
+    private final int heartbeatIntervalMs = 2;
+    private final long retryBackoffMs = 100;
     private MockTime time;
     private MockClient client;
     private Node node;
@@ -130,7 +132,7 @@ public class WorkerCoordinatorTest {
         this.consumerClient = new ConsumerNetworkClient(logContext, client, metadata, time, 100, 1000, heartbeatIntervalMs);
         this.metrics = new Metrics(time);
         this.rebalanceListener = new MockRebalanceListener();
-        this.configStorage = PowerMock.createMock(KafkaConfigBackingStore.class);
+        this.configStorage = mock(KafkaConfigBackingStore.class);
         this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
                                                         rebalanceTimeoutMs,
                                                         heartbeatIntervalMs,
@@ -151,7 +153,7 @@ public class WorkerCoordinatorTest {
                                                  0);
 
         configState1 = new ClusterConfigState(
-                1L,
+                4L,
                 null,
                 Collections.singletonMap(connectorId1, 1),
                 Collections.singletonMap(connectorId1, new HashMap<>()),
@@ -177,7 +179,7 @@ public class WorkerCoordinatorTest {
         configState2TaskConfigs.put(taskId1x1, new HashMap<>());
         configState2TaskConfigs.put(taskId2x0, new HashMap<>());
         configState2 = new ClusterConfigState(
-                2L,
+                9L,
                 null,
                 configState2ConnectorTaskCounts,
                 configState2ConnectorConfigs,
@@ -206,7 +208,7 @@ public class WorkerCoordinatorTest {
         configStateSingleTaskConnectorsTaskConfigs.put(taskId2x0, new HashMap<>());
         configStateSingleTaskConnectorsTaskConfigs.put(taskId3x0, new HashMap<>());
         configStateSingleTaskConnectors = new ClusterConfigState(
-                2L,
+                12L,
                 null,
                 configStateSingleTaskConnectorsConnectorTaskCounts,
                 configStateSingleTaskConnectorsConnectorConfigs,
@@ -229,9 +231,7 @@ public class WorkerCoordinatorTest {
 
     @Test
     public void testMetadata() {
-        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
-
-        PowerMock.replayAll();
+        when(configStorage.snapshot()).thenReturn(configState1);
 
         JoinGroupRequestData.JoinGroupRequestProtocolCollection serialized = coordinator.metadata();
         assertEquals(expectedMetadataSize, serialized.size());
@@ -242,33 +242,31 @@ public class WorkerCoordinatorTest {
         assertEquals(compatibility.protocol(), defaultMetadata.name());
         ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(
                 ByteBuffer.wrap(defaultMetadata.metadata()));
-        assertEquals(1, state.offset());
+        assertEquals(configState1.offset(), state.offset());
 
-        PowerMock.verifyAll();
+        verify(configStorage).snapshot();
     }
 
     @Test
     public void testNormalJoinGroupLeader() {
-        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
-
-        PowerMock.replayAll();
+        when(configStorage.snapshot()).thenReturn(configState1);
 
-        final String consumerId = "leader";
+        final String memberId = "leader";
 
         client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
         // normal join group
         Map<String, Long> memberConfigOffsets = new HashMap<>();
-        memberConfigOffsets.put("leader", 1L);
-        memberConfigOffsets.put("member", 1L);
-        client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberConfigOffsets, Errors.NONE));
+        memberConfigOffsets.put("leader", configState1.offset());
+        memberConfigOffsets.put("member", configState1.offset());
+        client.prepareResponse(joinGroupLeaderResponse(1, memberId, memberConfigOffsets, Errors.NONE));
         client.prepareResponse(body -> {
             SyncGroupRequest sync = (SyncGroupRequest) body;
-            return sync.data().memberId().equals(consumerId) &&
+            return sync.data().memberId().equals(memberId) &&
                     sync.data().generationId() == 1 &&
-                    sync.groupAssignments().containsKey(consumerId);
-        }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId1),
+                    sync.groupAssignments().containsKey(memberId);
+        }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.singletonList(connectorId1),
                 Collections.emptyList(), Errors.NONE));
         coordinator.ensureActiveGroup();
 
@@ -276,19 +274,17 @@ public class WorkerCoordinatorTest {
         assertEquals(0, rebalanceListener.revokedCount);
         assertEquals(1, rebalanceListener.assignedCount);
         assertFalse(rebalanceListener.assignment.failed());
-        assertEquals(1L, rebalanceListener.assignment.offset());
+        assertEquals(configState1.offset(), rebalanceListener.assignment.offset());
         assertEquals("leader", rebalanceListener.assignment.leader());
         assertEquals(Collections.singletonList(connectorId1), rebalanceListener.assignment.connectors());
         assertEquals(Collections.emptyList(), rebalanceListener.assignment.tasks());
 
-        PowerMock.verifyAll();
+        verify(configStorage).snapshot();
     }
 
     @Test
     public void testNormalJoinGroupFollower() {
-        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
-
-        PowerMock.replayAll();
+        when(configStorage.snapshot()).thenReturn(configState1);
 
         final String memberId = "member";
 
@@ -302,7 +298,7 @@ public class WorkerCoordinatorTest {
             return sync.data().memberId().equals(memberId) &&
                     sync.data().generationId() == 1 &&
                     sync.data().assignments().isEmpty();
-        }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.emptyList(),
+        }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(),
                 Collections.singletonList(taskId1x0), Errors.NONE));
         coordinator.ensureActiveGroup();
 
@@ -310,23 +306,18 @@ public class WorkerCoordinatorTest {
         assertEquals(0, rebalanceListener.revokedCount);
         assertEquals(1, rebalanceListener.assignedCount);
         assertFalse(rebalanceListener.assignment.failed());
-        assertEquals(1L, rebalanceListener.assignment.offset());
+        assertEquals(configState1.offset(), rebalanceListener.assignment.offset());
         assertEquals(Collections.emptyList(), rebalanceListener.assignment.connectors());
         assertEquals(Collections.singletonList(taskId1x0), rebalanceListener.assignment.tasks());
 
-        PowerMock.verifyAll();
+        verify(configStorage).snapshot();
     }
 
     @Test
     public void testJoinLeaderCannotAssign() {
         // If the selected leader can't get up to the maximum offset, it will fail to assign and we should immediately
         // need to retry the join.
-
-        // When the first round fails, we'll take an updated config snapshot
-        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
-        EasyMock.expect(configStorage.snapshot()).andReturn(configState2);
-
-        PowerMock.replayAll();
+        when(configStorage.snapshot()).thenReturn(configState1);
 
         final String memberId = "member";
 
@@ -341,43 +332,44 @@ public class WorkerCoordinatorTest {
                     sync.data().generationId() == 1 &&
                     sync.data().assignments().isEmpty();
         };
-        client.prepareResponse(matcher, syncGroupResponse(ConnectProtocol.Assignment.CONFIG_MISMATCH, "leader", 10L,
+        client.prepareResponse(matcher, syncGroupResponse(ConnectProtocol.Assignment.CONFIG_MISMATCH, "leader", configState2.offset(),
                 Collections.emptyList(), Collections.emptyList(), Errors.NONE));
+
+        // When the first round fails, we'll take an updated config snapshot
+        when(configStorage.snapshot()).thenReturn(configState2);
+
         client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE));
-        client.prepareResponse(matcher, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L,
+        client.prepareResponse(matcher, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState2.offset(),
                 Collections.emptyList(), Collections.singletonList(taskId1x0), Errors.NONE));
         coordinator.ensureActiveGroup();
 
-        PowerMock.verifyAll();
+        verify(configStorage, times(2)).snapshot();
     }
 
     @Test
     public void testRejoinGroup() {
-        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
-        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
-
-        PowerMock.replayAll();
+        when(configStorage.snapshot()).thenReturn(configState1);
 
         client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node));
         coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
 
         // join the group once
-        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
-        client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.emptyList(),
+        client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(),
                 Collections.singletonList(taskId1x0), Errors.NONE));
         coordinator.ensureActiveGroup();
 
         assertEquals(0, rebalanceListener.revokedCount);
         assertEquals(1, rebalanceListener.assignedCount);
         assertFalse(rebalanceListener.assignment.failed());
-        assertEquals(1L, rebalanceListener.assignment.offset());
+        assertEquals(configState1.offset(), rebalanceListener.assignment.offset());
         assertEquals(Collections.emptyList(), rebalanceListener.assignment.connectors());
         assertEquals(Collections.singletonList(taskId1x0), rebalanceListener.assignment.tasks());
 
         // and join the group again
         coordinator.requestRejoin("test");
-        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
-        client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId1),
+        client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.singletonList(connectorId1),
                 Collections.emptyList(), Errors.NONE));
         coordinator.ensureActiveGroup();
 
@@ -386,11 +378,11 @@ public class WorkerCoordinatorTest {
         assertEquals(Collections.singletonList(taskId1x0), rebalanceListener.revokedTasks);
         assertEquals(2, rebalanceListener.assignedCount);
         assertFalse(rebalanceListener.assignment.failed());
-        assertEquals(1L, rebalanceListener.assignment.offset());
+        assertEquals(configState1.offset(), rebalanceListener.assignment.offset());
         assertEquals(Collections.singletonList(connectorId1), rebalanceListener.assignment.connectors());
         assertEquals(Collections.emptyList(), rebalanceListener.assignment.tasks());
 
-        PowerMock.verifyAll();
+        verify(configStorage, times(2)).snapshot();
     }
 
     @Test
@@ -398,9 +390,7 @@ public class WorkerCoordinatorTest {
         // Since all the protocol responses are mocked, the other tests validate doSync runs, but don't validate its
         // output. So we test it directly here.
 
-        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
-
-        PowerMock.replayAll();
+        when(configStorage.snapshot()).thenReturn(configState1);
 
         // Prime the current configuration state
         coordinator.metadata();
@@ -409,11 +399,11 @@ public class WorkerCoordinatorTest {
         List<JoinGroupResponseData.JoinGroupResponseMember> responseMembers = new ArrayList<>();
         responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
                 .setMemberId("leader")
-                .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)).array())
+                .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, configState1.offset())).array())
         );
         responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
                 .setMemberId("member")
-                .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)).array())
+                .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, configState1.offset())).array())
         );
         Map<String, ByteBuffer> result = coordinator.onLeaderElected("leader", EAGER.protocol(), responseMembers, false);
 
@@ -421,18 +411,18 @@ public class WorkerCoordinatorTest {
         ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(result.get("leader"));
         assertFalse(leaderAssignment.failed());
         assertEquals("leader", leaderAssignment.leader());
-        assertEquals(1, leaderAssignment.offset());
+        assertEquals(configState1.offset(), leaderAssignment.offset());
         assertEquals(Collections.singletonList(connectorId1), leaderAssignment.connectors());
         assertEquals(Collections.emptyList(), leaderAssignment.tasks());
 
         ConnectProtocol.Assignment memberAssignment = ConnectProtocol.deserializeAssignment(result.get("member"));
         assertFalse(memberAssignment.failed());
         assertEquals("leader", memberAssignment.leader());
-        assertEquals(1, memberAssignment.offset());
+        assertEquals(configState1.offset(), memberAssignment.offset());
         assertEquals(Collections.emptyList(), memberAssignment.connectors());
         assertEquals(Collections.singletonList(taskId1x0), memberAssignment.tasks());
 
-        PowerMock.verifyAll();
+        verify(configStorage).snapshot();
     }
 
     @Test
@@ -440,22 +430,20 @@ public class WorkerCoordinatorTest {
         // Since all the protocol responses are mocked, the other tests validate doSync runs, but don't validate its
         // output. So we test it directly here.
 
-        EasyMock.expect(configStorage.snapshot()).andReturn(configState2);
-
-        PowerMock.replayAll();
+        when(configStorage.snapshot()).thenReturn(configState2);
 
         // Prime the current configuration state
         coordinator.metadata();
 
-        // Mark everyone as in sync with configState1
+        // Mark everyone as in sync with configState2
         List<JoinGroupResponseData.JoinGroupResponseMember> responseMembers = new ArrayList<>();
         responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
                 .setMemberId("leader")
-                .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)).array())
+                .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, configState2.offset())).array())
         );
         responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
                 .setMemberId("member")
-                .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)).array())
+                .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, configState2.offset())).array())
         );
 
         Map<String, ByteBuffer> result = coordinator.onLeaderElected("leader", EAGER.protocol(), responseMembers, false);
@@ -464,18 +452,18 @@ public class WorkerCoordinatorTest {
         ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(result.get("leader"));
         assertFalse(leaderAssignment.failed());
         assertEquals("leader", leaderAssignment.leader());
-        assertEquals(1, leaderAssignment.offset());
+        assertEquals(configState2.offset(), leaderAssignment.offset());
         assertEquals(Collections.singletonList(connectorId1), leaderAssignment.connectors());
         assertEquals(Arrays.asList(taskId1x0, taskId2x0), leaderAssignment.tasks());
 
         ConnectProtocol.Assignment memberAssignment = ConnectProtocol.deserializeAssignment(result.get("member"));
         assertFalse(memberAssignment.failed());
         assertEquals("leader", memberAssignment.leader());
-        assertEquals(1, memberAssignment.offset());
+        assertEquals(configState2.offset(), memberAssignment.offset());
         assertEquals(Collections.singletonList(connectorId2), memberAssignment.connectors());
         assertEquals(Collections.singletonList(taskId1x1), memberAssignment.tasks());
 
-        PowerMock.verifyAll();
+        verify(configStorage).snapshot();
     }
 
     @Test
@@ -483,22 +471,20 @@ public class WorkerCoordinatorTest {
         // Since all the protocol responses are mocked, the other tests validate doSync runs, but don't validate its
         // output. So we test it directly here.
 
-        EasyMock.expect(configStorage.snapshot()).andReturn(configStateSingleTaskConnectors);
-
-        PowerMock.replayAll();
+        when(configStorage.snapshot()).thenReturn(configStateSingleTaskConnectors);
 
         // Prime the current configuration state
         coordinator.metadata();
 
-        // Mark everyone as in sync with configState1
+        // Mark everyone as in sync with configStateSingleTaskConnectors
         List<JoinGroupResponseData.JoinGroupResponseMember> responseMembers = new ArrayList<>();
         responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
                 .setMemberId("leader")
-                .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)).array())
+                .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, configStateSingleTaskConnectors.offset())).array())
         );
         responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
                 .setMemberId("member")
-                .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)).array())
+                .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, configStateSingleTaskConnectors.offset())).array())
         );
 
         Map<String, ByteBuffer> result = coordinator.onLeaderElected("leader", EAGER.protocol(), responseMembers, false);
@@ -508,18 +494,18 @@ public class WorkerCoordinatorTest {
         ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(result.get("leader"));
         assertFalse(leaderAssignment.failed());
         assertEquals("leader", leaderAssignment.leader());
-        assertEquals(1, leaderAssignment.offset());
+        assertEquals(configStateSingleTaskConnectors.offset(), leaderAssignment.offset());
         assertEquals(Arrays.asList(connectorId1, connectorId3), leaderAssignment.connectors());
-        assertEquals(Arrays.asList(taskId2x0), leaderAssignment.tasks());
+        assertEquals(Collections.singletonList(taskId2x0), leaderAssignment.tasks());
 
         ConnectProtocol.Assignment memberAssignment = ConnectProtocol.deserializeAssignment(result.get("member"));
         assertFalse(memberAssignment.failed());
         assertEquals("leader", memberAssignment.leader());
-        assertEquals(1, memberAssignment.offset());
+        assertEquals(configStateSingleTaskConnectors.offset(), memberAssignment.offset());
         assertEquals(Collections.singletonList(connectorId2), memberAssignment.connectors());
         assertEquals(Arrays.asList(taskId1x0, taskId3x0), memberAssignment.tasks());
 
-        PowerMock.verifyAll();
+        verify(configStorage).snapshot();
     }
 
     @Test
@@ -527,15 +513,14 @@ public class WorkerCoordinatorTest {
         // Connect does not support static membership so skipping assignment should
         // never be set to true by the group coordinator. It is treated as an
         // illegal state if it would.
-        EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
-        PowerMock.replayAll();
+        when(configStorage.snapshot()).thenReturn(configState1);
 
         coordinator.metadata();
 
         assertThrows(IllegalStateException.class,
             () -> coordinator.onLeaderElected("leader", EAGER.protocol(), Collections.emptyList(), true));
 
-        PowerMock.verifyAll();
+        verify(configStorage).snapshot();
     }
 
     private JoinGroupResponse joinGroupLeaderResponse(int generationId, String memberId,
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
index 4e928a3703..9764843924 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
@@ -20,21 +20,20 @@ import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
-import org.easymock.EasyMock;
-import org.easymock.EasyMockRunner;
-import org.easymock.EasyMockSupport;
-import org.easymock.Mock;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-@RunWith(EasyMockRunner.class)
-public class RootResourceTest extends EasyMockSupport {
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class RootResourceTest {
 
-    @Mock
-    private Herder herder;
+    @Mock private Herder herder;
     private RootResource rootResource;
 
     @Before
@@ -44,15 +43,13 @@ public class RootResourceTest extends EasyMockSupport {
 
     @Test
     public void testRootGet() {
-        EasyMock.expect(herder.kafkaClusterId()).andReturn(MockAdminClient.DEFAULT_CLUSTER_ID);
-
-        replayAll();
+        when(herder.kafkaClusterId()).thenReturn(MockAdminClient.DEFAULT_CLUSTER_ID);
 
         ServerInfo info = rootResource.serverInfo();
         assertEquals(AppInfoParser.getVersion(), info.version());
         assertEquals(AppInfoParser.getCommitId(), info.commit());
         assertEquals(MockAdminClient.DEFAULT_CLUSTER_ID, info.clusterId());
 
-        verifyAll();
+        verify(herder).kafkaClusterId();
     }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java
deleted file mode 100644
index a6a155fc4b..0000000000
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.util;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.easymock.EasyMock;
-import org.easymock.IArgumentMatcher;
-
-import java.util.Arrays;
-
-public class ByteArrayProducerRecordEquals implements IArgumentMatcher {
-    private ProducerRecord<byte[], byte[]> record;
-
-    public static ProducerRecord<byte[], byte[]> eqProducerRecord(ProducerRecord<byte[], byte[]> in) {
-        EasyMock.reportMatcher(new ByteArrayProducerRecordEquals(in));
-        return null;
-    }
-
-    public ByteArrayProducerRecordEquals(ProducerRecord<byte[], byte[]> record) {
-        this.record = record;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public boolean matches(Object argument) {
-        if (!(argument instanceof ProducerRecord))
-            return false;
-        ProducerRecord<byte[], byte[]> other = (ProducerRecord<byte[], byte[]>) argument;
-        return record.topic().equals(other.topic()) &&
-                record.partition() != null ? record.partition().equals(other.partition()) : other.partition() == null &&
-                record.key() != null ? Arrays.equals(record.key(), other.key()) : other.key() == null &&
-                record.value() != null ? Arrays.equals(record.value(), other.value()) : other.value() == null;
-    }
-
-    @Override
-    public void appendTo(StringBuffer buffer) {
-        buffer.append(record.toString());
-    }
-}