You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/08/23 15:49:50 UTC
[kafka] branch trunk updated: KAFKA-13133: Replace EasyMock and PowerMock with Mockito for AbstractHerderTest (#12473)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4b310d1fe17 KAFKA-13133: Replace EasyMock and PowerMock with Mockito for AbstractHerderTest (#12473)
4b310d1fe17 is described below
commit 4b310d1fe171d6d5edda13bb1baf2cf5a0d8eb68
Author: Divij Vaidya <di...@amazon.com>
AuthorDate: Tue Aug 23 17:49:38 2022 +0200
KAFKA-13133: Replace EasyMock and PowerMock with Mockito for AbstractHerderTest (#12473)
Reviewers: Mickael Maison <mi...@gmail.com>, Yash Mayya <ya...@gmail.com>
Co-authored-by: wycccccc <49...@qq.com>
Co-authored-by: wycccccc <43...@users.noreply.github.com>
---
build.gradle | 2 +-
.../kafka/connect/runtime/AbstractHerderTest.java | 490 ++++++++-------------
2 files changed, 189 insertions(+), 303 deletions(-)
diff --git a/build.gradle b/build.gradle
index 7c38d899a6e..16840d81b0f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -404,7 +404,7 @@ subprojects {
if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_16)) {
testsToExclude.addAll([
// connect tests
- "**/AbstractHerderTest.*", "**/ConnectorPluginsResourceTest.*",
+ "**/ConnectorPluginsResourceTest.*",
"**/ConnectorsResourceTest.*", "**/DistributedHerderTest.*", "**/FileOffsetBakingStoreTest.*",
"**/ErrorHandlingTaskTest.*", "**/KafkaConfigBackingStoreTest.*", "**/KafkaOffsetBackingStoreTest.*",
"**/KafkaBasedLogTest.*", "**/OffsetStorageWriterTest.*", "**/StandaloneHerderTest.*",
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index ada507f1eb3..50f3ac00671 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.provider.DirectoryConfigProvider;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredLoginCallbackHandler;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
@@ -48,15 +49,13 @@ import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.api.easymock.annotation.Mock;
-import org.powermock.api.easymock.annotation.MockStrict;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import java.util.ArrayList;
import java.util.Arrays;
@@ -72,20 +71,22 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static org.apache.kafka.connect.runtime.AbstractHerder.keysWithVariableValues;
-import static org.easymock.EasyMock.anyString;
-import static org.easymock.EasyMock.partialMockBuilder;
-import static org.easymock.EasyMock.strictMock;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.withSettings;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
-import static org.powermock.api.easymock.PowerMock.replayAll;
-import static org.powermock.api.easymock.PowerMock.verifyAll;
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({AbstractHerder.class})
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class AbstractHerderTest {
private static final String CONN1 = "sourceA";
@@ -138,96 +139,59 @@ public class AbstractHerderTest {
private final String workerId = "workerId";
private final String kafkaClusterId = "I4ZmrWqfT2e-upky_4fdPA";
private final int generation = 5;
- private final String connector = "connector";
+ private final String connectorName = "connector";
private final ConnectorClientConfigOverridePolicy noneConnectorClientConfigOverridePolicy = new NoneConnectorClientConfigOverridePolicy();
- @MockStrict private Worker worker;
- @MockStrict private WorkerConfigTransformer transformer;
- @MockStrict private ConfigBackingStore configStore;
- @MockStrict private StatusBackingStore statusStore;
- @MockStrict private ClassLoader classLoader;
+ @Mock private Worker worker;
+ @Mock private WorkerConfigTransformer transformer;
+ @Mock private ConfigBackingStore configStore;
+ @Mock private StatusBackingStore statusStore;
+ @Mock private ClassLoader classLoader;
@Mock private Plugins plugins;
+ private ClassLoader loader;
+ private Connector connector;
+
+ @Before
+ public void before() {
+ loader = Utils.getContextOrKafkaClassLoader();
+ }
+
+ @After
+ public void tearDown() {
+ if (loader != null) Plugins.compareAndSwapLoaders(loader);
+ }
+
@Test
public void testConnectors() {
- AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
- .withConstructor(
- Worker.class,
- String.class,
- String.class,
- StatusBackingStore.class,
- ConfigBackingStore.class,
- ConnectorClientConfigOverridePolicy.class
- )
- .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .addMockedMethod("generation")
- .createMock();
-
- EasyMock.expect(herder.generation()).andStubReturn(generation);
- EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
- EasyMock.expect(configStore.snapshot()).andReturn(SNAPSHOT);
- replayAll();
+ AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+ .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+ .defaultAnswer(CALLS_REAL_METHODS));
+
+ when(configStore.snapshot()).thenReturn(SNAPSHOT);
assertEquals(Collections.singleton(CONN1), new HashSet<>(herder.connectors()));
- PowerMock.verifyAll();
}
@Test
public void testConnectorStatus() {
- ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
- AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
- .withConstructor(
- Worker.class,
- String.class,
- String.class,
- StatusBackingStore.class,
- ConfigBackingStore.class,
- ConnectorClientConfigOverridePolicy.class
- )
- .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .addMockedMethod("generation")
- .createMock();
-
- EasyMock.expect(herder.generation()).andStubReturn(generation);
- EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
- EasyMock.expect(statusStore.get(connector))
- .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));
- EasyMock.expect(statusStore.getAll(connector))
- .andReturn(Collections.singletonList(
- new TaskStatus(taskId, AbstractStatus.State.UNASSIGNED, workerId, generation)));
-
- replayAll();
- ConnectorStateInfo csi = herder.connectorStatus(connector);
- PowerMock.verifyAll();
- }
-
- @Test
- public void connectorStatus() {
- ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
+ ConnectorTaskId taskId = new ConnectorTaskId(connectorName, 0);
- AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
- .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
- ConnectorClientConfigOverridePolicy.class)
- .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .addMockedMethod("generation")
- .createMock();
+ AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+ .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+ .defaultAnswer(CALLS_REAL_METHODS));
- EasyMock.expect(herder.generation()).andStubReturn(generation);
- EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
+ when(herder.rawConfig(connectorName)).thenReturn(null);
- EasyMock.expect(statusStore.get(connector))
- .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));
+ when(statusStore.get(connectorName))
+ .thenReturn(new ConnectorStatus(connectorName, AbstractStatus.State.RUNNING, workerId, generation));
- EasyMock.expect(statusStore.getAll(connector))
- .andReturn(Collections.singletonList(
+ when(statusStore.getAll(connectorName))
+ .thenReturn(Collections.singletonList(
new TaskStatus(taskId, AbstractStatus.State.UNASSIGNED, workerId, generation)));
- EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
-
- replayAll();
+ ConnectorStateInfo state = herder.connectorStatus(connectorName);
- ConnectorStateInfo state = herder.connectorStatus(connector);
-
- assertEquals(connector, state.name());
+ assertEquals(connectorName, state.name());
assertEquals("RUNNING", state.connector().state());
assertEquals(1, state.tasks().size());
assertEquals(workerId, state.connector().workerId());
@@ -236,31 +200,21 @@ public class AbstractHerderTest {
assertEquals(0, taskState.id());
assertEquals("UNASSIGNED", taskState.state());
assertEquals(workerId, taskState.workerId());
-
- PowerMock.verifyAll();
}
@Test
- public void taskStatus() {
- ConnectorTaskId taskId = new ConnectorTaskId("connector", 0);
+ public void testTaskStatus() {
+ ConnectorTaskId taskId = new ConnectorTaskId(connectorName, 0);
String workerId = "workerId";
- AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
- .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
- ConnectorClientConfigOverridePolicy.class)
- .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .addMockedMethod("generation")
- .createMock();
+ AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+ .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+ .defaultAnswer(CALLS_REAL_METHODS));
- EasyMock.expect(herder.generation()).andStubReturn(5);
+ final ArgumentCaptor<TaskStatus> taskStatusArgumentCaptor = ArgumentCaptor.forClass(TaskStatus.class);
+ doNothing().when(statusStore).putSafe(taskStatusArgumentCaptor.capture());
- final Capture<TaskStatus> statusCapture = EasyMock.newCapture();
- statusStore.putSafe(EasyMock.capture(statusCapture));
- EasyMock.expectLastCall();
-
- EasyMock.expect(statusStore.get(taskId)).andAnswer(statusCapture::getValue);
-
- replayAll();
+ when(statusStore.get(taskId)).thenAnswer(invocation -> taskStatusArgumentCaptor.getValue());
herder.onFailure(taskId, new RuntimeException());
@@ -269,35 +223,26 @@ public class AbstractHerderTest {
assertEquals("FAILED", taskState.state());
assertEquals(0, taskState.id());
assertNotNull(taskState.trace());
-
- verifyAll();
}
@Test
public void testBuildRestartPlanForUnknownConnector() {
String connectorName = "UnknownConnector";
RestartRequest restartRequest = new RestartRequest(connectorName, false, true);
- AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
- .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
- ConnectorClientConfigOverridePolicy.class)
- .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .addMockedMethod("generation")
- .createMock();
-
- EasyMock.expect(herder.generation()).andStubReturn(generation);
+ AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+ .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+ .defaultAnswer(CALLS_REAL_METHODS));
- EasyMock.expect(statusStore.get(connectorName)).andReturn(null);
- replayAll();
+ when(statusStore.get(connectorName)).thenReturn(null);
Optional<RestartPlan> mayBeRestartPlan = herder.buildRestartPlan(restartRequest);
assertFalse(mayBeRestartPlan.isPresent());
}
- @Test()
+ @Test
public void testConfigValidationNullConfig() {
AbstractHerder herder = createConfigValidationHerder(SampleSourceConnector.class, noneConnectorClientConfigOverridePolicy);
- replayAll();
Map<String, String> config = new HashMap<>();
config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSourceConnector.class.getName());
@@ -309,14 +254,11 @@ public class AbstractHerderTest {
assertEquals(1, configInfos.errorCount());
assertErrorForKey(configInfos, "testKey");
-
- verifyAll();
}
@Test
public void testConfigValidationMultipleNullConfig() {
AbstractHerder herder = createConfigValidationHerder(SampleSourceConnector.class, noneConnectorClientConfigOverridePolicy);
- replayAll();
Map<String, String> config = new HashMap<>();
config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSourceConnector.class.getName());
@@ -330,38 +272,28 @@ public class AbstractHerderTest {
assertEquals(2, configInfos.errorCount());
assertErrorForKey(configInfos, "testKey");
assertErrorForKey(configInfos, "secondTestKey");
-
- verifyAll();
}
@Test
public void testBuildRestartPlanForConnectorAndTasks() {
- RestartRequest restartRequest = new RestartRequest(connector, false, true);
+ RestartRequest restartRequest = new RestartRequest(connectorName, false, true);
- ConnectorTaskId taskId1 = new ConnectorTaskId(connector, 1);
- ConnectorTaskId taskId2 = new ConnectorTaskId(connector, 2);
+ ConnectorTaskId taskId1 = new ConnectorTaskId(connectorName, 1);
+ ConnectorTaskId taskId2 = new ConnectorTaskId(connectorName, 2);
List<TaskStatus> taskStatuses = new ArrayList<>();
taskStatuses.add(new TaskStatus(taskId1, AbstractStatus.State.RUNNING, workerId, generation));
taskStatuses.add(new TaskStatus(taskId2, AbstractStatus.State.FAILED, workerId, generation));
- AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
- .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
- ConnectorClientConfigOverridePolicy.class)
- .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .addMockedMethod("generation")
- .createMock();
+ AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+ .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+ .defaultAnswer(CALLS_REAL_METHODS));
- EasyMock.expect(herder.generation()).andStubReturn(generation);
- EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
+ when(herder.rawConfig(connectorName)).thenReturn(null);
- EasyMock.expect(statusStore.get(connector))
- .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));
+ when(statusStore.get(connectorName))
+ .thenReturn(new ConnectorStatus(connectorName, AbstractStatus.State.RUNNING, workerId, generation));
- EasyMock.expect(statusStore.getAll(connector))
- .andReturn(taskStatuses);
- EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
-
- replayAll();
+ when(statusStore.getAll(connectorName)).thenReturn(taskStatuses);
Optional<RestartPlan> mayBeRestartPlan = herder.buildRestartPlan(restartRequest);
@@ -372,38 +304,28 @@ public class AbstractHerderTest {
assertEquals(2, restartPlan.taskIdsToRestart().size());
assertTrue(restartPlan.taskIdsToRestart().contains(taskId1));
assertTrue(restartPlan.taskIdsToRestart().contains(taskId2));
-
- PowerMock.verifyAll();
}
@Test
public void testBuildRestartPlanForNoRestart() {
- RestartRequest restartRequest = new RestartRequest(connector, true, false);
+ RestartRequest restartRequest = new RestartRequest(connectorName, true, false);
- ConnectorTaskId taskId1 = new ConnectorTaskId(connector, 1);
- ConnectorTaskId taskId2 = new ConnectorTaskId(connector, 2);
+ ConnectorTaskId taskId1 = new ConnectorTaskId(connectorName, 1);
+ ConnectorTaskId taskId2 = new ConnectorTaskId(connectorName, 2);
List<TaskStatus> taskStatuses = new ArrayList<>();
taskStatuses.add(new TaskStatus(taskId1, AbstractStatus.State.RUNNING, workerId, generation));
taskStatuses.add(new TaskStatus(taskId2, AbstractStatus.State.FAILED, workerId, generation));
- AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
- .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
- ConnectorClientConfigOverridePolicy.class)
- .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .addMockedMethod("generation")
- .createMock();
+ AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+ .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+ .defaultAnswer(CALLS_REAL_METHODS));
- EasyMock.expect(herder.generation()).andStubReturn(generation);
- EasyMock.expect(herder.rawConfig(connector)).andReturn(null);
+ when(herder.rawConfig(connectorName)).thenReturn(null);
- EasyMock.expect(statusStore.get(connector))
- .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));
+ when(statusStore.get(connectorName))
+ .thenReturn(new ConnectorStatus(connectorName, AbstractStatus.State.RUNNING, workerId, generation));
- EasyMock.expect(statusStore.getAll(connector))
- .andReturn(taskStatuses);
- EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
-
- replayAll();
+ when(statusStore.getAll(connectorName)).thenReturn(taskStatuses);
Optional<RestartPlan> mayBeRestartPlan = herder.buildRestartPlan(restartRequest);
@@ -412,38 +334,31 @@ public class AbstractHerderTest {
assertFalse(restartPlan.shouldRestartConnector());
assertFalse(restartPlan.shouldRestartTasks());
assertTrue(restartPlan.taskIdsToRestart().isEmpty());
-
- PowerMock.verifyAll();
}
@Test
public void testConfigValidationEmptyConfig() {
AbstractHerder herder = createConfigValidationHerder(SampleSourceConnector.class, noneConnectorClientConfigOverridePolicy, 0);
- replayAll();
assertThrows(BadRequestException.class, () -> herder.validateConnectorConfig(Collections.emptyMap(), false));
-
- verifyAll();
+ verify(transformer).transform(Collections.emptyMap());
}
- @Test()
+ @Test
public void testConfigValidationMissingName() {
- AbstractHerder herder = createConfigValidationHerder(SampleSourceConnector.class, noneConnectorClientConfigOverridePolicy);
- replayAll();
+ final Class<? extends Connector> connectorClass = SampleSourceConnector.class;
+ AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
- Map<String, String> config = Collections.singletonMap(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSourceConnector.class.getName());
+ Map<String, String> config = Collections.singletonMap(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
ConfigInfos result = herder.validateConnectorConfig(config, false);
// We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on
// the config fields for SourceConnectorConfig, but we expect these to change rarely.
- assertEquals(SampleSourceConnector.class.getName(), result.name());
- assertEquals(
- Arrays.asList(
- ConnectorConfig.COMMON_GROUP, ConnectorConfig.TRANSFORMS_GROUP,
- ConnectorConfig.PREDICATES_GROUP, ConnectorConfig.ERROR_GROUP,
- SourceConnectorConfig.TOPIC_CREATION_GROUP, SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
- SourceConnectorConfig.OFFSETS_TOPIC_GROUP),
- result.groups());
+ assertEquals(connectorClass.getName(), result.name());
+ assertEquals(Arrays.asList(ConnectorConfig.COMMON_GROUP, ConnectorConfig.TRANSFORMS_GROUP,
+ ConnectorConfig.PREDICATES_GROUP, ConnectorConfig.ERROR_GROUP,
+ SourceConnectorConfig.TOPIC_CREATION_GROUP, SourceConnectorConfig.EXACTLY_ONCE_SUPPORT_GROUP,
+ SourceConnectorConfig.OFFSETS_TOPIC_GROUP), result.groups());
assertEquals(2, result.errorCount());
Map<String, ConfigInfo> infos = result.values().stream()
.collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
@@ -457,69 +372,70 @@ public class AbstractHerderTest {
assertEquals("required", infos.get("required").configValue().name());
assertEquals(1, infos.get("required").configValue().errors().size());
- verifyAll();
+ verify(plugins).newConnector(connectorClass.getName());
+ verify(plugins).compareAndSwapLoaders(connector);
}
@Test
public void testConfigValidationInvalidTopics() {
- AbstractHerder herder = createConfigValidationHerder(SampleSinkConnector.class, noneConnectorClientConfigOverridePolicy);
- replayAll();
+ final Class<? extends Connector> connectorClass = SampleSinkConnector.class;
+ AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
Map<String, String> config = new HashMap<>();
- config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSinkConnector.class.getName());
+ config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
config.put(SinkConnectorConfig.TOPICS_CONFIG, "topic1,topic2");
config.put(SinkConnectorConfig.TOPICS_REGEX_CONFIG, "topic.*");
assertThrows(ConfigException.class, () -> herder.validateConnectorConfig(config, false));
- verifyAll();
+ verify(plugins).newConnector(connectorClass.getName());
+ verify(plugins).compareAndSwapLoaders(connector);
}
@Test
public void testConfigValidationTopicsWithDlq() {
- AbstractHerder herder = createConfigValidationHerder(SampleSinkConnector.class, noneConnectorClientConfigOverridePolicy);
- replayAll();
+ final Class<? extends Connector> connectorClass = SampleSinkConnector.class;
+ AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
Map<String, String> config = new HashMap<>();
- config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSinkConnector.class.getName());
+ config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
config.put(SinkConnectorConfig.TOPICS_CONFIG, "topic1");
config.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, "topic1");
assertThrows(ConfigException.class, () -> herder.validateConnectorConfig(config, false));
- verifyAll();
+ verify(plugins).newConnector(connectorClass.getName());
+ verify(plugins).compareAndSwapLoaders(connector);
}
@Test
public void testConfigValidationTopicsRegexWithDlq() {
- AbstractHerder herder = createConfigValidationHerder(SampleSinkConnector.class, noneConnectorClientConfigOverridePolicy);
- replayAll();
+ final Class<? extends Connector> connectorClass = SampleSinkConnector.class;
+ AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
Map<String, String> config = new HashMap<>();
- config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSinkConnector.class.getName());
+ config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
config.put(SinkConnectorConfig.TOPICS_REGEX_CONFIG, "topic.*");
config.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, "topic1");
assertThrows(ConfigException.class, () -> herder.validateConnectorConfig(config, false));
- verifyAll();
+ verify(plugins).newConnector(connectorClass.getName());
+ verify(plugins).compareAndSwapLoaders(connector);
}
- @Test()
+ @Test
public void testConfigValidationTransformsExtendResults() {
- AbstractHerder herder = createConfigValidationHerder(SampleSourceConnector.class, noneConnectorClientConfigOverridePolicy);
+ final Class<? extends Connector> connectorClass = SampleSourceConnector.class;
+ AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
// 2 transform aliases defined -> 2 plugin lookups
- Set<PluginDesc<Transformation<?>>> transformations = new HashSet<>();
- transformations.add(transformationPluginDesc());
- EasyMock.expect(plugins.transformations()).andReturn(transformations).times(2);
-
- replayAll();
+ when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
// Define 2 transformations. One has a class defined and so can get embedded configs, the other is missing
// class info that should generate an error.
Map<String, String> config = new HashMap<>();
- config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSourceConnector.class.getName());
+ config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
config.put(ConnectorConfig.TRANSFORMS_CONFIG, "xformA,xformB");
config.put(ConnectorConfig.TRANSFORMS_CONFIG + ".xformA.type", SampleTransformation.class.getName());
@@ -529,7 +445,7 @@ public class AbstractHerderTest {
// We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on
// the config fields for SourceConnectorConfig, but we expect these to change rarely.
- assertEquals(SampleSourceConnector.class.getName(), result.name());
+ assertEquals(connectorClass.getName(), result.name());
// Each transform also gets its own group
List<String> expectedGroups = Arrays.asList(
ConnectorConfig.COMMON_GROUP,
@@ -556,28 +472,23 @@ public class AbstractHerderTest {
assertEquals("transforms.xformB.type", infos.get("transforms.xformB.type").configValue().name());
assertFalse(infos.get("transforms.xformB.type").configValue().errors().isEmpty());
- verifyAll();
+ verify(plugins, times(2)).transformations();
+ verify(plugins).newConnector(connectorClass.getName());
+ verify(plugins).compareAndSwapLoaders(connector);
}
- @Test()
+ @Test
public void testConfigValidationPredicatesExtendResults() {
- AbstractHerder herder = createConfigValidationHerder(SampleSourceConnector.class, noneConnectorClientConfigOverridePolicy);
+ final Class<? extends Connector> connectorClass = SampleSourceConnector.class;
+ AbstractHerder herder = createConfigValidationHerder(connectorClass, noneConnectorClientConfigOverridePolicy);
- // 2 transform aliases defined -> 2 plugin lookups
- Set<PluginDesc<Transformation<?>>> transformations = new HashSet<>();
- transformations.add(transformationPluginDesc());
- EasyMock.expect(plugins.transformations()).andReturn(transformations).times(1);
-
- Set<PluginDesc<Predicate<?>>> predicates = new HashSet<>();
- predicates.add(predicatePluginDesc());
- EasyMock.expect(plugins.predicates()).andReturn(predicates).times(2);
+ when(plugins.transformations()).thenReturn(Collections.singleton(transformationPluginDesc()));
+ when(plugins.predicates()).thenReturn(Collections.singleton(predicatePluginDesc()));
- replayAll();
-
- // Define 2 transformations. One has a class defined and so can get embedded configs, the other is missing
+ // Define 2 predicates. One has a class defined and so can get embedded configs, the other is missing
// class info that should generate an error.
Map<String, String> config = new HashMap<>();
- config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSourceConnector.class.getName());
+ config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
config.put(ConnectorConfig.TRANSFORMS_CONFIG, "xformA");
config.put(ConnectorConfig.TRANSFORMS_CONFIG + ".xformA.type", SampleTransformation.class.getName());
@@ -585,12 +496,13 @@ public class AbstractHerderTest {
config.put(ConnectorConfig.PREDICATES_CONFIG, "predX,predY");
config.put(ConnectorConfig.PREDICATES_CONFIG + ".predX.type", SamplePredicate.class.getName());
config.put("required", "value"); // connector required config
+
ConfigInfos result = herder.validateConnectorConfig(config, false);
- assertEquals(herder.connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)), ConnectorType.SOURCE);
+ assertEquals(ConnectorType.SOURCE, herder.connectorTypeForClass(config.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG)));
// We expect there to be errors due to the missing name and .... Note that these assertions depend heavily on
// the config fields for SourceConnectorConfig, but we expect these to change rarely.
- assertEquals(SampleSourceConnector.class.getName(), result.name());
+ assertEquals(connectorClass.getName(), result.name());
// Each transform also gets its own group
List<String> expectedGroups = Arrays.asList(
ConnectorConfig.COMMON_GROUP,
@@ -610,27 +522,22 @@ public class AbstractHerderTest {
.collect(Collectors.toMap(info -> info.configKey().name(), Function.identity()));
assertEquals(28, infos.size());
// Should get 2 type fields from the transforms, first adds its own config since it has a valid class
- assertEquals("transforms.xformA.type",
- infos.get("transforms.xformA.type").configValue().name());
+ assertEquals("transforms.xformA.type", infos.get("transforms.xformA.type").configValue().name());
assertTrue(infos.get("transforms.xformA.type").configValue().errors().isEmpty());
- assertEquals("transforms.xformA.subconfig",
- infos.get("transforms.xformA.subconfig").configValue().name());
- assertEquals("transforms.xformA.predicate",
- infos.get("transforms.xformA.predicate").configValue().name());
+ assertEquals("transforms.xformA.subconfig", infos.get("transforms.xformA.subconfig").configValue().name());
+ assertEquals("transforms.xformA.predicate", infos.get("transforms.xformA.predicate").configValue().name());
assertTrue(infos.get("transforms.xformA.predicate").configValue().errors().isEmpty());
- assertEquals("transforms.xformA.negate",
- infos.get("transforms.xformA.negate").configValue().name());
+ assertEquals("transforms.xformA.negate", infos.get("transforms.xformA.negate").configValue().name());
assertTrue(infos.get("transforms.xformA.negate").configValue().errors().isEmpty());
- assertEquals("predicates.predX.type",
- infos.get("predicates.predX.type").configValue().name());
- assertEquals("predicates.predX.predconfig",
- infos.get("predicates.predX.predconfig").configValue().name());
- assertEquals("predicates.predY.type",
- infos.get("predicates.predY.type").configValue().name());
- assertFalse(
- infos.get("predicates.predY.type").configValue().errors().isEmpty());
-
- verifyAll();
+ assertEquals("predicates.predX.type", infos.get("predicates.predX.type").configValue().name());
+ assertEquals("predicates.predX.predconfig", infos.get("predicates.predX.predconfig").configValue().name());
+ assertEquals("predicates.predY.type", infos.get("predicates.predY.type").configValue().name());
+ assertFalse(infos.get("predicates.predY.type").configValue().errors().isEmpty());
+
+ verify(plugins).transformations();
+ verify(plugins, times(2)).predicates();
+ verify(plugins).newConnector(connectorClass.getName());
+ verify(plugins).compareAndSwapLoaders(connector);
}
@SuppressWarnings({"rawtypes", "unchecked"})
@@ -643,13 +550,13 @@ public class AbstractHerderTest {
return new PluginDesc(SampleTransformation.class, "1.0", classLoader);
}
- @Test()
+ @Test
public void testConfigValidationPrincipalOnlyOverride() {
- AbstractHerder herder = createConfigValidationHerder(SampleSourceConnector.class, new PrincipalConnectorClientConfigOverridePolicy());
- replayAll();
+ final Class<? extends Connector> connectorClass = SampleSourceConnector.class;
+ AbstractHerder herder = createConfigValidationHerder(connectorClass, new PrincipalConnectorClientConfigOverridePolicy());
Map<String, String> config = new HashMap<>();
- config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSourceConnector.class.getName());
+ config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
config.put("required", "value"); // connector required config
String ackConfigKey = producerOverrideKey(ProducerConfig.ACKS_CONFIG);
@@ -682,16 +589,17 @@ public class AbstractHerderTest {
assertTrue(result.values().stream().anyMatch(
configInfo -> saslConfigKey.equals(configInfo.configValue().name()) && configInfo.configValue().errors().isEmpty()));
- verifyAll();
+ verify(plugins).newConnector(connectorClass.getName());
+ verify(plugins).compareAndSwapLoaders(connector);
}
@Test
public void testConfigValidationAllOverride() {
- AbstractHerder herder = createConfigValidationHerder(SampleSourceConnector.class, new AllConnectorClientConfigOverridePolicy());
- replayAll();
+ final Class<? extends Connector> connectorClass = SampleSourceConnector.class;
+ AbstractHerder herder = createConfigValidationHerder(connectorClass, new AllConnectorClientConfigOverridePolicy());
Map<String, String> config = new HashMap<>();
- config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SampleSourceConnector.class.getName());
+ config.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
config.put(ConnectorConfig.NAME_CONFIG, "connector-name");
config.put("required", "value"); // connector required config
// Try to test a variety of configuration types: string, int, long, boolean, list, class
@@ -731,7 +639,9 @@ public class AbstractHerderTest {
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
assertEquals(rawOverriddenClientConfigs, validatedOverriddenClientConfigs);
- verifyAll();
+
+ verify(plugins).newConnector(connectorClass.getName());
+ verify(plugins).compareAndSwapLoaders(connector);
}
@Test
@@ -918,21 +828,13 @@ public class AbstractHerderTest {
@Test
public void testConnectorPluginConfig() throws Exception {
- AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
- .withConstructor(
- Worker.class,
- String.class,
- String.class,
- StatusBackingStore.class,
- ConfigBackingStore.class,
- ConnectorClientConfigOverridePolicy.class
- )
- .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .addMockedMethod("generation")
- .createMock();
-
- EasyMock.expect(plugins.newPlugin(EasyMock.anyString())).andAnswer(() -> {
- String name = (String) EasyMock.getCurrentArguments()[0];
+
+ AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+ .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+ .defaultAnswer(CALLS_REAL_METHODS));
+
+ when(plugins.newPlugin(anyString())).then(invocation -> {
+ String name = invocation.getArgument(0);
switch (name) {
case "sink": return new SampleSinkConnector();
case "source": return new SampleSourceConnector();
@@ -941,9 +843,8 @@ public class AbstractHerderTest {
case "predicate": return new SamplePredicate();
default: return new SampleTransformation<>();
}
- }).anyTimes();
- EasyMock.expect(herder.plugins()).andStubReturn(plugins);
- replayAll();
+ });
+ when(herder.plugins()).thenReturn(plugins);
List<ConfigKeyInfo> sinkConnectorConfigs = herder.connectorPluginConfig("sink");
assertNotNull(sinkConnectorConfigs);
@@ -973,30 +874,22 @@ public class AbstractHerderTest {
@Test(expected = NotFoundException.class)
public void testGetConnectorConfigDefWithBadName() throws Exception {
String connName = "AnotherPlugin";
- AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
- .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
- ConnectorClientConfigOverridePolicy.class)
- .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .addMockedMethod("generation")
- .createMock();
- EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
- EasyMock.expect(plugins.newPlugin(anyString())).andThrow(new ClassNotFoundException());
- replayAll();
+ AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+ .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+ .defaultAnswer(CALLS_REAL_METHODS));
+ when(worker.getPlugins()).thenReturn(plugins);
+ when(plugins.newPlugin(anyString())).thenThrow(new ClassNotFoundException());
herder.connectorPluginConfig(connName);
}
@Test(expected = BadRequestException.class)
public void testGetConnectorConfigDefWithInvalidPluginType() throws Exception {
String connName = "AnotherPlugin";
- AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
- .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
- ConnectorClientConfigOverridePolicy.class)
- .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
- .addMockedMethod("generation")
- .createMock();
- EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
- EasyMock.expect(plugins.newPlugin(anyString())).andReturn(new DirectoryConfigProvider());
- replayAll();
+ AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+ .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, noneConnectorClientConfigOverridePolicy)
+ .defaultAnswer(CALLS_REAL_METHODS));
+ when(worker.getPlugins()).thenReturn(plugins);
+ when(plugins.newPlugin(anyString())).thenReturn(new DirectoryConfigProvider());
herder.connectorPluginConfig(connName);
}
@@ -1055,23 +948,16 @@ public class AbstractHerderTest {
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy,
int countOfCallingNewConnector) {
-
- ConfigBackingStore configStore = strictMock(ConfigBackingStore.class);
- StatusBackingStore statusStore = strictMock(StatusBackingStore.class);
-
- AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
- .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class,
- ConnectorClientConfigOverridePolicy.class)
- .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore, connectorClientConfigOverridePolicy)
- .addMockedMethod("generation")
- .createMock();
- EasyMock.expect(herder.generation()).andStubReturn(generation);
+ AbstractHerder herder = mock(AbstractHerder.class, withSettings()
+ .useConstructor(worker, workerId, kafkaClusterId, statusStore, configStore, connectorClientConfigOverridePolicy)
+ .defaultAnswer(CALLS_REAL_METHODS));
// Call to validateConnectorConfig
- EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
- final Capture<Map<String, String>> configCapture = EasyMock.newCapture();
- EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
- EasyMock.expect(worker.getPlugins()).andStubReturn(plugins);
+ when(worker.configTransformer()).thenReturn(transformer);
+ @SuppressWarnings("unchecked")
+ final ArgumentCaptor<Map<String, String>> mapArgumentCaptor = ArgumentCaptor.forClass(Map.class);
+ when(transformer.transform(mapArgumentCaptor.capture())).thenAnswer(invocation -> mapArgumentCaptor.getValue());
+ when(worker.getPlugins()).thenReturn(plugins);
final Connector connector;
try {
connector = connectorClass.getConstructor().newInstance();
@@ -1079,18 +965,18 @@ public class AbstractHerderTest {
throw new RuntimeException("Couldn't create connector", e);
}
if (countOfCallingNewConnector > 0) {
- EasyMock.expect(plugins.newConnector(connectorClass.getName())).andReturn(connector).times(countOfCallingNewConnector);
- EasyMock.expect(plugins.compareAndSwapLoaders(connector)).andReturn(classLoader).times(countOfCallingNewConnector);
+ when(plugins.newConnector(connectorClass.getName())).thenReturn(connector);
+ when(plugins.compareAndSwapLoaders(connector)).thenReturn(classLoader);
}
-
+ this.connector = connector;
return herder;
}
// We need to use a real class here due to some issue with mocking java.lang.Class
- private abstract class BogusSourceConnector extends SourceConnector {
+ private abstract static class BogusSourceConnector extends SourceConnector {
}
- private abstract class BogusSourceTask extends SourceTask {
+ private abstract static class BogusSourceTask extends SourceTask {
}
private static String producerOverrideKey(String config) {