You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/08/22 18:17:50 UTC
[kafka] branch trunk updated: KAFKA-14133: Replace EasyMock with Mockito in WorkerCoordinatorTest and RootResourceTest (#12509)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ddb7fdd88f KAFKA-14133: Replace EasyMock with Mockito in WorkerCoordinatorTest and RootResourceTest (#12509)
ddb7fdd88f is described below
commit ddb7fdd88fbb559d858866f8674a670273405f72
Author: Yash Mayya <ya...@gmail.com>
AuthorDate: Mon Aug 22 23:47:34 2022 +0530
KAFKA-14133: Replace EasyMock with Mockito in WorkerCoordinatorTest and RootResourceTest (#12509)
Reviewers: Mickael Maison <mi...@gmail.com>, Dalibor Plavcic <da...@proton.me>
---
.../runtime/distributed/WorkerCoordinator.java | 4 +-
.../runtime/distributed/WorkerCoordinatorTest.java | 171 ++++++++++-----------
.../runtime/rest/resources/RootResourceTest.java | 21 ++-
.../util/ByteArrayProducerRecordEquals.java | 53 -------
4 files changed, 89 insertions(+), 160 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index ced67427a3..851a1bb836 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -373,10 +373,10 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
metrics.addMetric(metrics.metricName("assigned-connectors",
this.metricGrpName,
- "The number of connector instances currently assigned to this consumer"), numConnectors);
+ "The number of connector instances currently assigned to this worker"), numConnectors);
metrics.addMetric(metrics.metricName("assigned-tasks",
this.metricGrpName,
- "The number of tasks currently assigned to this consumer"), numTasks);
+ "The number of tasks currently assigned to this worker"), numTasks);
}
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index c3715aa302..c1d34c87b7 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -39,14 +39,12 @@ import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.storage.ClusterConfigState;
import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.easymock.EasyMock;
-import org.easymock.Mock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.powermock.api.easymock.PowerMock;
+import org.mockito.Mock;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -67,6 +65,10 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThrows;
import static org.junit.runners.Parameterized.Parameter;
import static org.junit.runners.Parameterized.Parameters;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
@RunWith(value = Parameterized.class)
public class WorkerCoordinatorTest {
@@ -74,19 +76,19 @@ public class WorkerCoordinatorTest {
private static final String LEADER_URL = "leaderUrl:8083";
private static final String MEMBER_URL = "memberUrl:8083";
- private String connectorId1 = "connector1";
- private String connectorId2 = "connector2";
- private String connectorId3 = "connector3";
- private ConnectorTaskId taskId1x0 = new ConnectorTaskId(connectorId1, 0);
- private ConnectorTaskId taskId1x1 = new ConnectorTaskId(connectorId1, 1);
- private ConnectorTaskId taskId2x0 = new ConnectorTaskId(connectorId2, 0);
- private ConnectorTaskId taskId3x0 = new ConnectorTaskId(connectorId3, 0);
-
- private String groupId = "test-group";
- private int sessionTimeoutMs = 10;
- private int rebalanceTimeoutMs = 60;
- private int heartbeatIntervalMs = 2;
- private long retryBackoffMs = 100;
+ private final String connectorId1 = "connector1";
+ private final String connectorId2 = "connector2";
+ private final String connectorId3 = "connector3";
+ private final ConnectorTaskId taskId1x0 = new ConnectorTaskId(connectorId1, 0);
+ private final ConnectorTaskId taskId1x1 = new ConnectorTaskId(connectorId1, 1);
+ private final ConnectorTaskId taskId2x0 = new ConnectorTaskId(connectorId2, 0);
+ private final ConnectorTaskId taskId3x0 = new ConnectorTaskId(connectorId3, 0);
+
+ private final String groupId = "test-group";
+ private final int sessionTimeoutMs = 10;
+ private final int rebalanceTimeoutMs = 60;
+ private final int heartbeatIntervalMs = 2;
+ private final long retryBackoffMs = 100;
private MockTime time;
private MockClient client;
private Node node;
@@ -130,7 +132,7 @@ public class WorkerCoordinatorTest {
this.consumerClient = new ConsumerNetworkClient(logContext, client, metadata, time, 100, 1000, heartbeatIntervalMs);
this.metrics = new Metrics(time);
this.rebalanceListener = new MockRebalanceListener();
- this.configStorage = PowerMock.createMock(KafkaConfigBackingStore.class);
+ this.configStorage = mock(KafkaConfigBackingStore.class);
this.rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs,
rebalanceTimeoutMs,
heartbeatIntervalMs,
@@ -151,7 +153,7 @@ public class WorkerCoordinatorTest {
0);
configState1 = new ClusterConfigState(
- 1L,
+ 4L,
null,
Collections.singletonMap(connectorId1, 1),
Collections.singletonMap(connectorId1, new HashMap<>()),
@@ -177,7 +179,7 @@ public class WorkerCoordinatorTest {
configState2TaskConfigs.put(taskId1x1, new HashMap<>());
configState2TaskConfigs.put(taskId2x0, new HashMap<>());
configState2 = new ClusterConfigState(
- 2L,
+ 9L,
null,
configState2ConnectorTaskCounts,
configState2ConnectorConfigs,
@@ -206,7 +208,7 @@ public class WorkerCoordinatorTest {
configStateSingleTaskConnectorsTaskConfigs.put(taskId2x0, new HashMap<>());
configStateSingleTaskConnectorsTaskConfigs.put(taskId3x0, new HashMap<>());
configStateSingleTaskConnectors = new ClusterConfigState(
- 2L,
+ 12L,
null,
configStateSingleTaskConnectorsConnectorTaskCounts,
configStateSingleTaskConnectorsConnectorConfigs,
@@ -229,9 +231,7 @@ public class WorkerCoordinatorTest {
@Test
public void testMetadata() {
- EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
-
- PowerMock.replayAll();
+ when(configStorage.snapshot()).thenReturn(configState1);
JoinGroupRequestData.JoinGroupRequestProtocolCollection serialized = coordinator.metadata();
assertEquals(expectedMetadataSize, serialized.size());
@@ -242,33 +242,31 @@ public class WorkerCoordinatorTest {
assertEquals(compatibility.protocol(), defaultMetadata.name());
ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(
ByteBuffer.wrap(defaultMetadata.metadata()));
- assertEquals(1, state.offset());
+ assertEquals(configState1.offset(), state.offset());
- PowerMock.verifyAll();
+ verify(configStorage).snapshot();
}
@Test
public void testNormalJoinGroupLeader() {
- EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
-
- PowerMock.replayAll();
+ when(configStorage.snapshot()).thenReturn(configState1);
- final String consumerId = "leader";
+ final String memberId = "leader";
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
// normal join group
Map<String, Long> memberConfigOffsets = new HashMap<>();
- memberConfigOffsets.put("leader", 1L);
- memberConfigOffsets.put("member", 1L);
- client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberConfigOffsets, Errors.NONE));
+ memberConfigOffsets.put("leader", configState1.offset());
+ memberConfigOffsets.put("member", configState1.offset());
+ client.prepareResponse(joinGroupLeaderResponse(1, memberId, memberConfigOffsets, Errors.NONE));
client.prepareResponse(body -> {
SyncGroupRequest sync = (SyncGroupRequest) body;
- return sync.data().memberId().equals(consumerId) &&
+ return sync.data().memberId().equals(memberId) &&
sync.data().generationId() == 1 &&
- sync.groupAssignments().containsKey(consumerId);
- }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId1),
+ sync.groupAssignments().containsKey(memberId);
+ }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.singletonList(connectorId1),
Collections.emptyList(), Errors.NONE));
coordinator.ensureActiveGroup();
@@ -276,19 +274,17 @@ public class WorkerCoordinatorTest {
assertEquals(0, rebalanceListener.revokedCount);
assertEquals(1, rebalanceListener.assignedCount);
assertFalse(rebalanceListener.assignment.failed());
- assertEquals(1L, rebalanceListener.assignment.offset());
+ assertEquals(configState1.offset(), rebalanceListener.assignment.offset());
assertEquals("leader", rebalanceListener.assignment.leader());
assertEquals(Collections.singletonList(connectorId1), rebalanceListener.assignment.connectors());
assertEquals(Collections.emptyList(), rebalanceListener.assignment.tasks());
- PowerMock.verifyAll();
+ verify(configStorage).snapshot();
}
@Test
public void testNormalJoinGroupFollower() {
- EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
-
- PowerMock.replayAll();
+ when(configStorage.snapshot()).thenReturn(configState1);
final String memberId = "member";
@@ -302,7 +298,7 @@ public class WorkerCoordinatorTest {
return sync.data().memberId().equals(memberId) &&
sync.data().generationId() == 1 &&
sync.data().assignments().isEmpty();
- }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.emptyList(),
+ }, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(),
Collections.singletonList(taskId1x0), Errors.NONE));
coordinator.ensureActiveGroup();
@@ -310,23 +306,18 @@ public class WorkerCoordinatorTest {
assertEquals(0, rebalanceListener.revokedCount);
assertEquals(1, rebalanceListener.assignedCount);
assertFalse(rebalanceListener.assignment.failed());
- assertEquals(1L, rebalanceListener.assignment.offset());
+ assertEquals(configState1.offset(), rebalanceListener.assignment.offset());
assertEquals(Collections.emptyList(), rebalanceListener.assignment.connectors());
assertEquals(Collections.singletonList(taskId1x0), rebalanceListener.assignment.tasks());
- PowerMock.verifyAll();
+ verify(configStorage).snapshot();
}
@Test
public void testJoinLeaderCannotAssign() {
// If the selected leader can't get up to the maximum offset, it will fail to assign and we should immediately
// need to retry the join.
-
- // When the first round fails, we'll take an updated config snapshot
- EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
- EasyMock.expect(configStorage.snapshot()).andReturn(configState2);
-
- PowerMock.replayAll();
+ when(configStorage.snapshot()).thenReturn(configState1);
final String memberId = "member";
@@ -341,43 +332,44 @@ public class WorkerCoordinatorTest {
sync.data().generationId() == 1 &&
sync.data().assignments().isEmpty();
};
- client.prepareResponse(matcher, syncGroupResponse(ConnectProtocol.Assignment.CONFIG_MISMATCH, "leader", 10L,
+ client.prepareResponse(matcher, syncGroupResponse(ConnectProtocol.Assignment.CONFIG_MISMATCH, "leader", configState2.offset(),
Collections.emptyList(), Collections.emptyList(), Errors.NONE));
+
+ // When the first round fails, we'll take an updated config snapshot
+ when(configStorage.snapshot()).thenReturn(configState2);
+
client.prepareResponse(joinGroupFollowerResponse(1, memberId, "leader", Errors.NONE));
- client.prepareResponse(matcher, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L,
+ client.prepareResponse(matcher, syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState2.offset(),
Collections.emptyList(), Collections.singletonList(taskId1x0), Errors.NONE));
coordinator.ensureActiveGroup();
- PowerMock.verifyAll();
+ verify(configStorage, times(2)).snapshot();
}
@Test
public void testRejoinGroup() {
- EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
- EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
-
- PowerMock.replayAll();
+ when(configStorage.snapshot()).thenReturn(configState1);
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node));
coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
// join the group once
- client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
- client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.emptyList(),
+ client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE));
+ client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.emptyList(),
Collections.singletonList(taskId1x0), Errors.NONE));
coordinator.ensureActiveGroup();
assertEquals(0, rebalanceListener.revokedCount);
assertEquals(1, rebalanceListener.assignedCount);
assertFalse(rebalanceListener.assignment.failed());
- assertEquals(1L, rebalanceListener.assignment.offset());
+ assertEquals(configState1.offset(), rebalanceListener.assignment.offset());
assertEquals(Collections.emptyList(), rebalanceListener.assignment.connectors());
assertEquals(Collections.singletonList(taskId1x0), rebalanceListener.assignment.tasks());
// and join the group again
coordinator.requestRejoin("test");
- client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
- client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", 1L, Collections.singletonList(connectorId1),
+ client.prepareResponse(joinGroupFollowerResponse(1, "member", "leader", Errors.NONE));
+ client.prepareResponse(syncGroupResponse(ConnectProtocol.Assignment.NO_ERROR, "leader", configState1.offset(), Collections.singletonList(connectorId1),
Collections.emptyList(), Errors.NONE));
coordinator.ensureActiveGroup();
@@ -386,11 +378,11 @@ public class WorkerCoordinatorTest {
assertEquals(Collections.singletonList(taskId1x0), rebalanceListener.revokedTasks);
assertEquals(2, rebalanceListener.assignedCount);
assertFalse(rebalanceListener.assignment.failed());
- assertEquals(1L, rebalanceListener.assignment.offset());
+ assertEquals(configState1.offset(), rebalanceListener.assignment.offset());
assertEquals(Collections.singletonList(connectorId1), rebalanceListener.assignment.connectors());
assertEquals(Collections.emptyList(), rebalanceListener.assignment.tasks());
- PowerMock.verifyAll();
+ verify(configStorage, times(2)).snapshot();
}
@Test
@@ -398,9 +390,7 @@ public class WorkerCoordinatorTest {
// Since all the protocol responses are mocked, the other tests validate doSync runs, but don't validate its
// output. So we test it directly here.
- EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
-
- PowerMock.replayAll();
+ when(configStorage.snapshot()).thenReturn(configState1);
// Prime the current configuration state
coordinator.metadata();
@@ -409,11 +399,11 @@ public class WorkerCoordinatorTest {
List<JoinGroupResponseData.JoinGroupResponseMember> responseMembers = new ArrayList<>();
responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
.setMemberId("leader")
- .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)).array())
+ .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, configState1.offset())).array())
);
responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
.setMemberId("member")
- .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)).array())
+ .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, configState1.offset())).array())
);
Map<String, ByteBuffer> result = coordinator.onLeaderElected("leader", EAGER.protocol(), responseMembers, false);
@@ -421,18 +411,18 @@ public class WorkerCoordinatorTest {
ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(result.get("leader"));
assertFalse(leaderAssignment.failed());
assertEquals("leader", leaderAssignment.leader());
- assertEquals(1, leaderAssignment.offset());
+ assertEquals(configState1.offset(), leaderAssignment.offset());
assertEquals(Collections.singletonList(connectorId1), leaderAssignment.connectors());
assertEquals(Collections.emptyList(), leaderAssignment.tasks());
ConnectProtocol.Assignment memberAssignment = ConnectProtocol.deserializeAssignment(result.get("member"));
assertFalse(memberAssignment.failed());
assertEquals("leader", memberAssignment.leader());
- assertEquals(1, memberAssignment.offset());
+ assertEquals(configState1.offset(), memberAssignment.offset());
assertEquals(Collections.emptyList(), memberAssignment.connectors());
assertEquals(Collections.singletonList(taskId1x0), memberAssignment.tasks());
- PowerMock.verifyAll();
+ verify(configStorage).snapshot();
}
@Test
@@ -440,22 +430,20 @@ public class WorkerCoordinatorTest {
// Since all the protocol responses are mocked, the other tests validate doSync runs, but don't validate its
// output. So we test it directly here.
- EasyMock.expect(configStorage.snapshot()).andReturn(configState2);
-
- PowerMock.replayAll();
+ when(configStorage.snapshot()).thenReturn(configState2);
// Prime the current configuration state
coordinator.metadata();
- // Mark everyone as in sync with configState1
+ // Mark everyone as in sync with configState2
List<JoinGroupResponseData.JoinGroupResponseMember> responseMembers = new ArrayList<>();
responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
.setMemberId("leader")
- .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)).array())
+ .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, configState2.offset())).array())
);
responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
.setMemberId("member")
- .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)).array())
+ .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, configState2.offset())).array())
);
Map<String, ByteBuffer> result = coordinator.onLeaderElected("leader", EAGER.protocol(), responseMembers, false);
@@ -464,18 +452,18 @@ public class WorkerCoordinatorTest {
ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(result.get("leader"));
assertFalse(leaderAssignment.failed());
assertEquals("leader", leaderAssignment.leader());
- assertEquals(1, leaderAssignment.offset());
+ assertEquals(configState2.offset(), leaderAssignment.offset());
assertEquals(Collections.singletonList(connectorId1), leaderAssignment.connectors());
assertEquals(Arrays.asList(taskId1x0, taskId2x0), leaderAssignment.tasks());
ConnectProtocol.Assignment memberAssignment = ConnectProtocol.deserializeAssignment(result.get("member"));
assertFalse(memberAssignment.failed());
assertEquals("leader", memberAssignment.leader());
- assertEquals(1, memberAssignment.offset());
+ assertEquals(configState2.offset(), memberAssignment.offset());
assertEquals(Collections.singletonList(connectorId2), memberAssignment.connectors());
assertEquals(Collections.singletonList(taskId1x1), memberAssignment.tasks());
- PowerMock.verifyAll();
+ verify(configStorage).snapshot();
}
@Test
@@ -483,22 +471,20 @@ public class WorkerCoordinatorTest {
// Since all the protocol responses are mocked, the other tests validate doSync runs, but don't validate its
// output. So we test it directly here.
- EasyMock.expect(configStorage.snapshot()).andReturn(configStateSingleTaskConnectors);
-
- PowerMock.replayAll();
+ when(configStorage.snapshot()).thenReturn(configStateSingleTaskConnectors);
// Prime the current configuration state
coordinator.metadata();
- // Mark everyone as in sync with configState1
+ // Mark everyone as in sync with configStateSingleTaskConnectors
List<JoinGroupResponseData.JoinGroupResponseMember> responseMembers = new ArrayList<>();
responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
.setMemberId("leader")
- .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, 1L)).array())
+ .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(LEADER_URL, configStateSingleTaskConnectors.offset())).array())
);
responseMembers.add(new JoinGroupResponseData.JoinGroupResponseMember()
.setMemberId("member")
- .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)).array())
+ .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, configStateSingleTaskConnectors.offset())).array())
);
Map<String, ByteBuffer> result = coordinator.onLeaderElected("leader", EAGER.protocol(), responseMembers, false);
@@ -508,18 +494,18 @@ public class WorkerCoordinatorTest {
ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(result.get("leader"));
assertFalse(leaderAssignment.failed());
assertEquals("leader", leaderAssignment.leader());
- assertEquals(1, leaderAssignment.offset());
+ assertEquals(configStateSingleTaskConnectors.offset(), leaderAssignment.offset());
assertEquals(Arrays.asList(connectorId1, connectorId3), leaderAssignment.connectors());
- assertEquals(Arrays.asList(taskId2x0), leaderAssignment.tasks());
+ assertEquals(Collections.singletonList(taskId2x0), leaderAssignment.tasks());
ConnectProtocol.Assignment memberAssignment = ConnectProtocol.deserializeAssignment(result.get("member"));
assertFalse(memberAssignment.failed());
assertEquals("leader", memberAssignment.leader());
- assertEquals(1, memberAssignment.offset());
+ assertEquals(configStateSingleTaskConnectors.offset(), memberAssignment.offset());
assertEquals(Collections.singletonList(connectorId2), memberAssignment.connectors());
assertEquals(Arrays.asList(taskId1x0, taskId3x0), memberAssignment.tasks());
- PowerMock.verifyAll();
+ verify(configStorage).snapshot();
}
@Test
@@ -527,15 +513,14 @@ public class WorkerCoordinatorTest {
// Connect does not support static membership so skipping assignment should
// never be set to true by the group coordinator. It is treated as an
// illegal state if it would.
- EasyMock.expect(configStorage.snapshot()).andReturn(configState1);
- PowerMock.replayAll();
+ when(configStorage.snapshot()).thenReturn(configState1);
coordinator.metadata();
assertThrows(IllegalStateException.class,
() -> coordinator.onLeaderElected("leader", EAGER.protocol(), Collections.emptyList(), true));
- PowerMock.verifyAll();
+ verify(configStorage).snapshot();
}
private JoinGroupResponse joinGroupLeaderResponse(int generationId, String memberId,
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
index 4e928a3703..9764843924 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
@@ -20,21 +20,20 @@ import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
-import org.easymock.EasyMock;
-import org.easymock.EasyMockRunner;
-import org.easymock.EasyMockSupport;
-import org.easymock.Mock;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
-@RunWith(EasyMockRunner.class)
-public class RootResourceTest extends EasyMockSupport {
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
+public class RootResourceTest {
- @Mock
- private Herder herder;
+ @Mock private Herder herder;
private RootResource rootResource;
@Before
@@ -44,15 +43,13 @@ public class RootResourceTest extends EasyMockSupport {
@Test
public void testRootGet() {
- EasyMock.expect(herder.kafkaClusterId()).andReturn(MockAdminClient.DEFAULT_CLUSTER_ID);
-
- replayAll();
+ when(herder.kafkaClusterId()).thenReturn(MockAdminClient.DEFAULT_CLUSTER_ID);
ServerInfo info = rootResource.serverInfo();
assertEquals(AppInfoParser.getVersion(), info.version());
assertEquals(AppInfoParser.getCommitId(), info.commit());
assertEquals(MockAdminClient.DEFAULT_CLUSTER_ID, info.clusterId());
- verifyAll();
+ verify(herder).kafkaClusterId();
}
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java
deleted file mode 100644
index a6a155fc4b..0000000000
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.connect.util;
-
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.easymock.EasyMock;
-import org.easymock.IArgumentMatcher;
-
-import java.util.Arrays;
-
-public class ByteArrayProducerRecordEquals implements IArgumentMatcher {
- private ProducerRecord<byte[], byte[]> record;
-
- public static ProducerRecord<byte[], byte[]> eqProducerRecord(ProducerRecord<byte[], byte[]> in) {
- EasyMock.reportMatcher(new ByteArrayProducerRecordEquals(in));
- return null;
- }
-
- public ByteArrayProducerRecordEquals(ProducerRecord<byte[], byte[]> record) {
- this.record = record;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public boolean matches(Object argument) {
- if (!(argument instanceof ProducerRecord))
- return false;
- ProducerRecord<byte[], byte[]> other = (ProducerRecord<byte[], byte[]>) argument;
- return record.topic().equals(other.topic()) &&
- record.partition() != null ? record.partition().equals(other.partition()) : other.partition() == null &&
- record.key() != null ? Arrays.equals(record.key(), other.key()) : other.key() == null &&
- record.value() != null ? Arrays.equals(record.value(), other.value()) : other.value() == null;
- }
-
- @Override
- public void appendTo(StringBuffer buffer) {
- buffer.append(record.toString());
- }
-}