You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2022/09/14 15:00:56 UTC
[kafka] branch trunk updated: KAFKA-14132: Migrate some Connect tests from EasyMock/PowerMock to Mockito (#12615)
This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bdf2cdb27fa KAFKA-14132: Migrate some Connect tests from EasyMock/PowerMock to Mockito (#12615)
bdf2cdb27fa is described below
commit bdf2cdb27fa39c30171a0e63641cd8e8b851fe0c
Author: Yash Mayya <ya...@gmail.com>
AuthorDate: Wed Sep 14 20:30:32 2022 +0530
KAFKA-14132: Migrate some Connect tests from EasyMock/PowerMock to Mockito (#12615)
Reviewers: Chris Egerton <ch...@aiven.io>
---
.../org/apache/kafka/connect/runtime/Worker.java | 3 +-
.../runtime/errors/RetryWithToleranceOperator.java | 13 +-
.../connect/runtime/ErrorHandlingTaskTest.java | 8 +-
.../kafka/connect/runtime/WorkerTaskTest.java | 251 +++++++--------------
.../connect/runtime/errors/ErrorReporterTest.java | 56 ++---
.../errors/RetryWithToleranceOperatorTest.java | 200 +++++++---------
.../errors/WorkerErrantRecordReporterTest.java | 90 +++++---
7 files changed, 252 insertions(+), 369 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 55e445ca8e1..293eb0f79d2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -1248,8 +1248,7 @@ public class Worker {
final Class<? extends Connector> connectorClass = plugins.connectorClass(
connectorConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(connectorConfig.errorRetryTimeout(),
- connectorConfig.errorMaxDelayInMillis(), connectorConfig.errorToleranceType(), Time.SYSTEM);
- retryWithToleranceOperator.metrics(errorHandlingMetrics);
+ connectorConfig.errorMaxDelayInMillis(), connectorConfig.errorToleranceType(), Time.SYSTEM, errorHandlingMetrics);
return doBuild(task, id, configState, statusListener, initialState,
connectorConfig, keyConverter, valueConverter, headerConverter, classLoader,
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
index 947fabf3d0c..6262f19e190 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java
@@ -77,24 +77,25 @@ public class RetryWithToleranceOperator implements AutoCloseable {
private long totalFailures = 0;
private final Time time;
- private ErrorHandlingMetrics errorHandlingMetrics;
+ private final ErrorHandlingMetrics errorHandlingMetrics;
private final CountDownLatch stopRequestedLatch;
private volatile boolean stopping; // indicates whether the operator has been asked to stop retrying
protected final ProcessingContext context;
public RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis,
- ToleranceType toleranceType, Time time) {
- this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, new ProcessingContext(), new CountDownLatch(1));
+ ToleranceType toleranceType, Time time, ErrorHandlingMetrics errorHandlingMetrics) {
+ this(errorRetryTimeout, errorMaxDelayInMillis, toleranceType, time, errorHandlingMetrics, new ProcessingContext(), new CountDownLatch(1));
}
RetryWithToleranceOperator(long errorRetryTimeout, long errorMaxDelayInMillis,
- ToleranceType toleranceType, Time time,
+ ToleranceType toleranceType, Time time, ErrorHandlingMetrics errorHandlingMetrics,
ProcessingContext context, CountDownLatch stopRequestedLatch) {
this.errorRetryTimeout = errorRetryTimeout;
this.errorMaxDelayInMillis = errorMaxDelayInMillis;
this.errorToleranceType = toleranceType;
this.time = time;
+ this.errorHandlingMetrics = errorHandlingMetrics;
this.context = context;
this.stopRequestedLatch = stopRequestedLatch;
this.stopping = false;
@@ -284,10 +285,6 @@ public class RetryWithToleranceOperator implements AutoCloseable {
}
}
- public synchronized void metrics(ErrorHandlingMetrics errorHandlingMetrics) {
- this.errorHandlingMetrics = errorHandlingMetrics;
- }
-
@Override
public String toString() {
return "RetryWithToleranceOperator{" +
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index b2ba4178805..f1913d9848e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -231,7 +231,6 @@ public class ErrorHandlingTaskTest {
ErrorReporter reporter = EasyMock.mock(ErrorReporter.class);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
- retryWithToleranceOperator.metrics(errorHandlingMetrics);
retryWithToleranceOperator.reporters(singletonList(reporter));
createSinkTask(initialState, retryWithToleranceOperator);
@@ -262,7 +261,6 @@ public class ErrorHandlingTaskTest {
ErrorReporter reporter = EasyMock.mock(ErrorReporter.class);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
- retryWithToleranceOperator.metrics(errorHandlingMetrics);
retryWithToleranceOperator.reporters(singletonList(reporter));
createSourceTask(initialState, retryWithToleranceOperator);
@@ -286,7 +284,6 @@ public class ErrorHandlingTaskTest {
ErrorReporter reporterB = EasyMock.mock(ErrorReporter.class);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
- retryWithToleranceOperator.metrics(errorHandlingMetrics);
retryWithToleranceOperator.reporters(Arrays.asList(reporterA, reporterB));
createSourceTask(initialState, retryWithToleranceOperator);
@@ -316,7 +313,6 @@ public class ErrorHandlingTaskTest {
LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
- retryWithToleranceOperator.metrics(errorHandlingMetrics);
retryWithToleranceOperator.reporters(singletonList(reporter));
createSinkTask(initialState, retryWithToleranceOperator);
@@ -357,7 +353,7 @@ public class ErrorHandlingTaskTest {
}
private RetryWithToleranceOperator operator() {
- return new RetryWithToleranceOperator(OPERATOR_RETRY_TIMEOUT_MILLIS, OPERATOR_RETRY_MAX_DELAY_MILLIS, OPERATOR_TOLERANCE_TYPE, SYSTEM);
+ return new RetryWithToleranceOperator(OPERATOR_RETRY_TIMEOUT_MILLIS, OPERATOR_RETRY_MAX_DELAY_MILLIS, OPERATOR_TOLERANCE_TYPE, SYSTEM, errorHandlingMetrics);
}
@Test
@@ -368,7 +364,6 @@ public class ErrorHandlingTaskTest {
LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
- retryWithToleranceOperator.metrics(errorHandlingMetrics);
retryWithToleranceOperator.reporters(singletonList(reporter));
createSourceTask(initialState, retryWithToleranceOperator);
@@ -433,7 +428,6 @@ public class ErrorHandlingTaskTest {
LogReporter reporter = new LogReporter(taskId, connConfig(reportProps), errorHandlingMetrics);
RetryWithToleranceOperator retryWithToleranceOperator = operator();
- retryWithToleranceOperator.metrics(errorHandlingMetrics);
retryWithToleranceOperator.reporters(singletonList(reporter));
createSourceTask(initialState, retryWithToleranceOperator, badConverter());
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index 890c0f7399c..ae9c75d5253 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.TaskStatus.Listener;
import org.apache.kafka.connect.runtime.WorkerTask.TaskMetricsGroup;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest;
@@ -25,29 +26,24 @@ import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.common.utils.MockTime;
-import org.easymock.EasyMock;
-import org.easymock.Mock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
-import static org.easymock.EasyMock.expectLastCall;
-import static org.easymock.EasyMock.partialMockBuilder;
-import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({WorkerTask.class})
-@PowerMockIgnore("javax.management.*")
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class WorkerTaskTest {
private static final Map<String, String> TASK_PROPS = new HashMap<>();
@@ -56,12 +52,11 @@ public class WorkerTaskTest {
}
private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
- private ConnectMetrics metrics;
@Mock private TaskStatus.Listener statusListener;
@Mock private ClassLoader loader;
- RetryWithToleranceOperator retryWithToleranceOperator;
- @Mock
- StatusBackingStore statusBackingStore;
+ @Mock private StatusBackingStore statusBackingStore;
+ private ConnectMetrics metrics;
+ private RetryWithToleranceOperator retryWithToleranceOperator;
@Before
public void setup() {
@@ -78,82 +73,35 @@ public class WorkerTaskTest {
public void standardStartup() {
ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
- WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
- .withConstructor(
- ConnectorTaskId.class,
- TaskStatus.Listener.class,
- TargetState.class,
- ClassLoader.class,
- ConnectMetrics.class,
- RetryWithToleranceOperator.class,
- Time.class,
- StatusBackingStore.class
- )
- .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics,
- retryWithToleranceOperator, Time.SYSTEM, statusBackingStore)
- .addMockedMethod("initialize")
- .addMockedMethod("initializeAndStart")
- .addMockedMethod("execute")
- .addMockedMethod("close")
- .createStrictMock();
-
- workerTask.initialize(TASK_CONFIG);
- expectLastCall();
-
- workerTask.initializeAndStart();
- expectLastCall();
-
- workerTask.execute();
- expectLastCall();
-
- statusListener.onStartup(taskId);
- expectLastCall();
-
- workerTask.close();
- expectLastCall();
-
- statusListener.onShutdown(taskId);
- expectLastCall();
-
- replay(workerTask);
+ WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, TargetState.STARTED, loader, metrics,
+ retryWithToleranceOperator, Time.SYSTEM, statusBackingStore);
workerTask.initialize(TASK_CONFIG);
workerTask.run();
workerTask.stop();
workerTask.awaitStop(1000L);
- verify(workerTask);
+ verify(statusListener).onStartup(taskId);
+ verify(statusListener).onShutdown(taskId);
}
@Test
public void stopBeforeStarting() {
ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
- WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
- .withConstructor(
- ConnectorTaskId.class,
- TaskStatus.Listener.class,
- TargetState.class,
- ClassLoader.class,
- ConnectMetrics.class,
- RetryWithToleranceOperator.class,
- Time.class,
- StatusBackingStore.class
- )
- .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics,
- retryWithToleranceOperator, Time.SYSTEM, statusBackingStore)
- .addMockedMethod("initialize")
- .addMockedMethod("execute")
- .addMockedMethod("close")
- .createStrictMock();
-
- workerTask.initialize(TASK_CONFIG);
- EasyMock.expectLastCall();
+ WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, TargetState.STARTED, loader, metrics,
+ retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) {
- workerTask.close();
- EasyMock.expectLastCall();
+ @Override
+ public void initializeAndStart() {
+ fail("This method is expected to not be invoked");
+ }
- replay(workerTask);
+ @Override
+ public void execute() {
+ fail("This method is expected to not be invoked");
+ }
+ };
workerTask.initialize(TASK_CONFIG);
workerTask.stop();
@@ -161,72 +109,44 @@ public class WorkerTaskTest {
// now run should not do anything
workerTask.run();
-
- verify(workerTask);
}
@Test
public void cancelBeforeStopping() throws Exception {
ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
-
- WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
- .withConstructor(
- ConnectorTaskId.class,
- TaskStatus.Listener.class,
- TargetState.class,
- ClassLoader.class,
- ConnectMetrics.class,
- RetryWithToleranceOperator.class,
- Time.class,
- StatusBackingStore.class
- )
- .withArgs(taskId, statusListener, TargetState.STARTED, loader, metrics,
- retryWithToleranceOperator, Time.SYSTEM, statusBackingStore)
- .addMockedMethod("initialize")
- .addMockedMethod("initializeAndStart")
- .addMockedMethod("execute")
- .addMockedMethod("close")
- .createStrictMock();
-
final CountDownLatch stopped = new CountDownLatch(1);
- final Thread thread = new Thread(() -> {
- try {
- stopped.await();
- } catch (Exception e) {
- }
- });
- workerTask.initialize(TASK_CONFIG);
- EasyMock.expectLastCall();
-
- workerTask.initializeAndStart();
- EasyMock.expectLastCall();
-
- workerTask.execute();
- expectLastCall().andAnswer(() -> {
- thread.start();
- return null;
- });
+ WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, TargetState.STARTED, loader, metrics,
+ retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) {
- statusListener.onStartup(taskId);
- expectLastCall();
-
- workerTask.close();
- expectLastCall();
-
- // there should be no call to onShutdown()
+ @Override
+ public void execute() {
+ try {
+ stopped.await();
+ } catch (InterruptedException e) {
+ fail("Unexpected interrupt");
+ }
+ }
- replay(workerTask);
+ // Trigger task shutdown immediately after start. The task will block in its execute() method
+ // until the stopped latch is counted down (i.e. it doesn't actually stop after stop is triggered).
+ @Override
+ public void initializeAndStart() {
+ stop();
+ }
+ };
workerTask.initialize(TASK_CONFIG);
- workerTask.run();
+ Thread t = new Thread(workerTask);
+ t.start();
- workerTask.stop();
workerTask.cancel();
stopped.countDown();
- thread.join();
+ t.join();
- verify(workerTask);
+ verify(statusListener).onStartup(taskId);
+ // there should be no other status updates, including shutdown
+ verifyNoMoreInteractions(statusListener);
}
@Test
@@ -235,20 +155,6 @@ public class WorkerTaskTest {
ConnectMetrics metrics = new MockConnectMetrics();
TaskMetricsGroup group = new TaskMetricsGroup(taskId, metrics, statusListener);
- statusListener.onStartup(taskId);
- expectLastCall();
-
- statusListener.onPause(taskId);
- expectLastCall();
-
- statusListener.onResume(taskId);
- expectLastCall();
-
- statusListener.onShutdown(taskId);
- expectLastCall();
-
- replay(statusListener);
-
group.onStartup(taskId);
assertRunningMetric(group);
group.onPause(taskId);
@@ -258,7 +164,10 @@ public class WorkerTaskTest {
group.onShutdown(taskId);
assertStoppedMetric(group);
- verify(statusListener);
+ verify(statusListener).onStartup(taskId);
+ verify(statusListener).onPause(taskId);
+ verify(statusListener).onResume(taskId);
+ verify(statusListener).onShutdown(taskId);
}
@Test
@@ -269,29 +178,6 @@ public class WorkerTaskTest {
ConnectException error = new ConnectException("error");
TaskMetricsGroup group = new TaskMetricsGroup(taskId, metrics, statusListener);
- statusListener.onStartup(taskId);
- expectLastCall();
-
- statusListener.onPause(taskId);
- expectLastCall();
-
- statusListener.onResume(taskId);
- expectLastCall();
-
- statusListener.onPause(taskId);
- expectLastCall();
-
- statusListener.onResume(taskId);
- expectLastCall();
-
- statusListener.onFailure(taskId, error);
- expectLastCall();
-
- statusListener.onShutdown(taskId);
- expectLastCall();
-
- replay(statusListener);
-
time.sleep(1000L);
group.onStartup(taskId);
assertRunningMetric(group);
@@ -320,7 +206,11 @@ public class WorkerTaskTest {
group.onShutdown(taskId);
assertStoppedMetric(group);
- verify(statusListener);
+ verify(statusListener).onStartup(taskId);
+ verify(statusListener, times(2)).onPause(taskId);
+ verify(statusListener, times(2)).onResume(taskId);
+ verify(statusListener).onFailure(taskId, error);
+ verify(statusListener).onShutdown(taskId);
long totalTime = 27000L;
double pauseTimeRatio = (double) (3000L + 5000L) / totalTime;
@@ -332,6 +222,31 @@ public class WorkerTaskTest {
private static abstract class TestSinkTask extends SinkTask {
}
+ private static class TestWorkerTask extends WorkerTask {
+
+ public TestWorkerTask(ConnectorTaskId id, Listener statusListener, TargetState initialState, ClassLoader loader,
+ ConnectMetrics connectMetrics, RetryWithToleranceOperator retryWithToleranceOperator, Time time,
+ StatusBackingStore statusBackingStore) {
+ super(id, statusListener, initialState, loader, connectMetrics, retryWithToleranceOperator, time, statusBackingStore);
+ }
+
+ @Override
+ public void initialize(TaskConfig taskConfig) {
+ }
+
+ @Override
+ protected void initializeAndStart() {
+ }
+
+ @Override
+ protected void execute() {
+ }
+
+ @Override
+ protected void close() {
+ }
+ }
+
protected void assertFailedMetric(TaskMetricsGroup metricsGroup) {
assertEquals(AbstractStatus.State.FAILED, metricsGroup.state());
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
index 11e72c2ef4a..a46dc3d5d2e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
@@ -31,15 +31,12 @@ import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.easymock.EasyMock;
-import org.easymock.Mock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import java.util.HashMap;
import java.util.Map;
@@ -58,14 +55,17 @@ import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ER
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_TOPIC;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_STAGE;
import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_TASK_ID;
-import static org.easymock.EasyMock.replay;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore("javax.management.*")
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class ErrorReporterTest {
private static final String TOPIC = "test-topic";
@@ -109,12 +109,10 @@ public class ErrorReporterTest {
ProcessingContext context = processingContext();
- EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andThrow(new RuntimeException());
- replay(producer);
-
- // since topic name is empty, this method should be a NOOP.
- // if it attempts to log to the DLQ via the producer, the send mock will throw a RuntimeException.
+ // since topic name is empty, this method should be a NOOP and producer.send() should
+ // not be called.
deadLetterQueueReporter.report(context);
+ verifyNoMoreInteractions(producer);
}
@Test
@@ -124,12 +122,11 @@ public class ErrorReporterTest {
ProcessingContext context = processingContext();
- EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(metadata);
- replay(producer);
+ when(producer.send(any(), any())).thenReturn(metadata);
deadLetterQueueReporter.report(context);
- PowerMock.verifyAll();
+ verify(producer, times(1)).send(any(), any());
}
@Test
@@ -139,28 +136,12 @@ public class ErrorReporterTest {
ProcessingContext context = processingContext();
- EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(metadata).times(2);
- replay(producer);
+ when(producer.send(any(), any())).thenReturn(metadata);
deadLetterQueueReporter.report(context);
deadLetterQueueReporter.report(context);
- PowerMock.verifyAll();
- }
-
- @Test
- public void testDLQReportAndReturnFuture() {
- DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(
- producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID, errorHandlingMetrics);
-
- ProcessingContext context = processingContext();
-
- EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(metadata);
- replay(producer);
-
- deadLetterQueueReporter.report(context);
-
- PowerMock.verifyAll();
+ verify(producer, times(2)).send(any(), any());
}
@Test
@@ -168,13 +149,8 @@ public class ErrorReporterTest {
DeadLetterQueueReporter deadLetterQueueReporter = new DeadLetterQueueReporter(
producer, config(singletonMap(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC)), TASK_ID, errorHandlingMetrics);
- producer.close();
- EasyMock.expectLastCall();
- replay(producer);
-
deadLetterQueueReporter.close();
-
- PowerMock.verifyAll();
+ verify(producer).close();
}
@Test
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
index e8b0c7a9553..6abb6221b55 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperatorTest.java
@@ -29,16 +29,14 @@ import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.runtime.isolation.PluginsTest.TestConverter;
import org.apache.kafka.connect.runtime.isolation.PluginsTest.TestableWorkerConfig;
+import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.easymock.EasyMock;
-import org.easymock.Mock;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+import org.mockito.stubbing.OngoingStubbing;
import java.util.Arrays;
import java.util.Collections;
@@ -67,44 +65,48 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_TOLERANCE_
import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_TOLERANCE_DEFAULT;
import static org.apache.kafka.connect.runtime.errors.ToleranceType.ALL;
import static org.apache.kafka.connect.runtime.errors.ToleranceType.NONE;
-import static org.easymock.EasyMock.replay;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ProcessingContext.class})
-@PowerMockIgnore("javax.management.*")
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class RetryWithToleranceOperatorTest {
+ private static final Map<String, String> PROPERTIES = new HashMap<String, String>() {{
+ put(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG, Objects.toString(2));
+ put(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG, Objects.toString(3000));
+ put(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString());
+
+ // define required properties
+ put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
+ put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
+ }};
+
public static final RetryWithToleranceOperator NOOP_OPERATOR = new RetryWithToleranceOperator(
- ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM);
+ ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM,
+ new ErrorHandlingMetrics(
+ new ConnectorTaskId("noop-connector", -1),
+ new ConnectMetrics("noop-worker", new TestableWorkerConfig(PROPERTIES),
+ new SystemTime(), "test-cluster"))
+ );
public static final RetryWithToleranceOperator ALL_OPERATOR = new RetryWithToleranceOperator(
- ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM);
- static {
- Map<String, String> properties = new HashMap<>();
- properties.put(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG, Objects.toString(2));
- properties.put(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG, Objects.toString(3000));
- properties.put(CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.INFO.toString());
-
- // define required properties
- properties.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
- properties.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
-
- NOOP_OPERATOR.metrics(new ErrorHandlingMetrics(
- new ConnectorTaskId("noop-connector", -1),
- new ConnectMetrics("noop-worker", new TestableWorkerConfig(properties), new SystemTime(), "test-cluster"))
- );
- ALL_OPERATOR.metrics(new ErrorHandlingMetrics(
- new ConnectorTaskId("errors-all-tolerate-connector", -1),
- new ConnectMetrics("errors-all-tolerate-worker", new TestableWorkerConfig(properties),
- new SystemTime(), "test-cluster"))
- );
- }
+ ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM,
+ new ErrorHandlingMetrics(
+ new ConnectorTaskId("errors-all-tolerate-connector", -1),
+ new ConnectMetrics("errors-all-tolerate-worker", new TestableWorkerConfig(PROPERTIES),
+ new SystemTime(), "test-cluster"))
+ );
- @SuppressWarnings("unused")
@Mock
private Operation<String> mockOperation;
@@ -120,8 +122,7 @@ public class RetryWithToleranceOperatorTest {
@Test
public void testExecuteFailed() {
RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0,
- ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM);
- retryWithToleranceOperator.metrics(errorHandlingMetrics);
+ ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, errorHandlingMetrics);
retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
SinkTask.class, consumerRecord, new Throwable());
@@ -130,8 +131,7 @@ public class RetryWithToleranceOperatorTest {
@Test
public void testExecuteFailedNoTolerance() {
RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0,
- ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM);
- retryWithToleranceOperator.metrics(errorHandlingMetrics);
+ ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM, errorHandlingMetrics);
assertThrows(ConnectException.class, () -> retryWithToleranceOperator.executeFailed(Stage.TASK_PUT,
SinkTask.class, consumerRecord, new Throwable()));
@@ -189,14 +189,15 @@ public class RetryWithToleranceOperatorTest {
private void testHandleExceptionInStage(Stage type, Exception ex) {
RetryWithToleranceOperator retryWithToleranceOperator = setupExecutor();
- retryWithToleranceOperator.execute(new ExceptionThrower(ex), type, ExceptionThrower.class);
+ Operation<?> exceptionThrower = () -> {
+ throw ex;
+ };
+ retryWithToleranceOperator.execute(exceptionThrower, type, RetryWithToleranceOperator.class);
assertTrue(retryWithToleranceOperator.failed());
- PowerMock.verifyAll();
}
private RetryWithToleranceOperator setupExecutor() {
- RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM);
- retryWithToleranceOperator.metrics(errorHandlingMetrics);
+ RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, errorHandlingMetrics);
return retryWithToleranceOperator;
}
@@ -222,25 +223,24 @@ public class RetryWithToleranceOperatorTest {
public void execAndHandleRetriableError(long errorRetryTimeout, int numRetriableExceptionsThrown, List<Long> expectedWaits, Exception e, boolean successExpected) throws Exception {
MockTime time = new MockTime(0, 0, 0);
- CountDownLatch exitLatch = PowerMock.createStrictMock(CountDownLatch.class);
- RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(errorRetryTimeout, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, new ProcessingContext(), exitLatch);
- retryWithToleranceOperator.metrics(errorHandlingMetrics);
-
- EasyMock.expect(mockOperation.call()).andThrow(e).times(numRetriableExceptionsThrown);
+ CountDownLatch exitLatch = mock(CountDownLatch.class);
+ RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(errorRetryTimeout, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, errorHandlingMetrics, new ProcessingContext(), exitLatch);
+ OngoingStubbing<String> mockOperationCall = when(mockOperation.call());
+ for (int i = 0; i < numRetriableExceptionsThrown; i++) {
+ mockOperationCall = mockOperationCall.thenThrow(e);
+ }
if (successExpected) {
- EasyMock.expect(mockOperation.call()).andReturn("Success");
+ mockOperationCall.thenReturn("Success");
}
for (Long expectedWait : expectedWaits) {
- EasyMock.expect(exitLatch.await(expectedWait, TimeUnit.MILLISECONDS)).andAnswer(() -> {
+ when(exitLatch.await(expectedWait, TimeUnit.MILLISECONDS)).thenAnswer(i -> {
time.sleep(expectedWait);
return false;
});
}
- replay(mockOperation, exitLatch);
-
String result = retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class);
if (successExpected) {
@@ -250,51 +250,47 @@ public class RetryWithToleranceOperatorTest {
assertTrue(retryWithToleranceOperator.failed());
}
- EasyMock.verify(mockOperation);
- PowerMock.verifyAll();
+ verifyNoMoreInteractions(exitLatch);
+ verify(mockOperation, times(successExpected ? numRetriableExceptionsThrown + 1 : numRetriableExceptionsThrown)).call();
}
@Test
public void testExecAndHandleNonRetriableError() throws Exception {
MockTime time = new MockTime(0, 0, 0);
- CountDownLatch exitLatch = PowerMock.createStrictMock(CountDownLatch.class);
- RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, new ProcessingContext(), exitLatch);
- retryWithToleranceOperator.metrics(errorHandlingMetrics);
-
- EasyMock.expect(mockOperation.call()).andThrow(new Exception("Test")).times(1);
-
- // expect no call to exitLatch.await() which is only called during the retry backoff
+ CountDownLatch exitLatch = mock(CountDownLatch.class);
+ RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(6000, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, errorHandlingMetrics, new ProcessingContext(), exitLatch);
- replay(mockOperation, exitLatch);
+ when(mockOperation.call()).thenThrow(new Exception("Test"));
String result = retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class);
assertTrue(retryWithToleranceOperator.failed());
assertNull(result);
- EasyMock.verify(mockOperation);
- PowerMock.verifyAll();
+ // expect no call to exitLatch.await() which is only called during the retry backoff
+ verify(mockOperation).call();
+ verify(exitLatch, never()).await(anyLong(), any());
}
@Test
public void testExitLatch() throws Exception {
MockTime time = new MockTime(0, 0, 0);
- CountDownLatch exitLatch = PowerMock.createStrictMock(CountDownLatch.class);
- RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(-1, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, new ProcessingContext(), exitLatch);
- retryWithToleranceOperator.metrics(errorHandlingMetrics);
- EasyMock.expect(mockOperation.call()).andThrow(new RetriableException("test")).anyTimes();
- EasyMock.expect(exitLatch.await(300, TimeUnit.MILLISECONDS)).andAnswer(() -> {
+ CountDownLatch exitLatch = mock(CountDownLatch.class);
+ RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(-1, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, time, errorHandlingMetrics, new ProcessingContext(), exitLatch);
+ when(mockOperation.call()).thenThrow(new RetriableException("test"));
+
+ when(exitLatch.await(300L, TimeUnit.MILLISECONDS)).thenAnswer(i -> {
time.sleep(300);
return false;
});
- EasyMock.expect(exitLatch.await(600, TimeUnit.MILLISECONDS)).andAnswer(() -> {
+ when(exitLatch.await(600L, TimeUnit.MILLISECONDS)).thenAnswer(i -> {
time.sleep(600);
return false;
});
- EasyMock.expect(exitLatch.await(1200, TimeUnit.MILLISECONDS)).andAnswer(() -> {
+ when(exitLatch.await(1200L, TimeUnit.MILLISECONDS)).thenAnswer(i -> {
time.sleep(1200);
return false;
});
- EasyMock.expect(exitLatch.await(2400, TimeUnit.MILLISECONDS)).andAnswer(() -> {
+ when(exitLatch.await(2400L, TimeUnit.MILLISECONDS)).thenAnswer(i -> {
time.sleep(2400);
retryWithToleranceOperator.triggerStop();
return false;
@@ -302,45 +298,40 @@ public class RetryWithToleranceOperatorTest {
// expect no more calls to exitLatch.await() after retryWithToleranceOperator.triggerStop() is called
- exitLatch.countDown();
- EasyMock.expectLastCall().once();
-
- replay(mockOperation, exitLatch);
retryWithToleranceOperator.execAndHandleError(mockOperation, Exception.class);
assertTrue(retryWithToleranceOperator.failed());
assertEquals(4500L, time.milliseconds());
- PowerMock.verifyAll();
+ verify(exitLatch).countDown();
+ verifyNoMoreInteractions(exitLatch);
}
@Test
public void testBackoffLimit() throws Exception {
MockTime time = new MockTime(0, 0, 0);
- CountDownLatch exitLatch = PowerMock.createStrictMock(CountDownLatch.class);
- RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(5, 5000, NONE, time, new ProcessingContext(), exitLatch);
+ CountDownLatch exitLatch = mock(CountDownLatch.class);
+ RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(5, 5000, NONE, time, errorHandlingMetrics, new ProcessingContext(), exitLatch);
- EasyMock.expect(exitLatch.await(300, TimeUnit.MILLISECONDS)).andAnswer(() -> {
+ when(exitLatch.await(300, TimeUnit.MILLISECONDS)).thenAnswer(i -> {
time.sleep(300);
return false;
});
- EasyMock.expect(exitLatch.await(600, TimeUnit.MILLISECONDS)).andAnswer(() -> {
+ when(exitLatch.await(600, TimeUnit.MILLISECONDS)).thenAnswer(i -> {
time.sleep(600);
return false;
});
- EasyMock.expect(exitLatch.await(1200, TimeUnit.MILLISECONDS)).andAnswer(() -> {
+ when(exitLatch.await(1200, TimeUnit.MILLISECONDS)).thenAnswer(i -> {
time.sleep(1200);
return false;
});
- EasyMock.expect(exitLatch.await(2400, TimeUnit.MILLISECONDS)).andAnswer(() -> {
+ when(exitLatch.await(2400, TimeUnit.MILLISECONDS)).thenAnswer(i -> {
time.sleep(2400);
return false;
});
- EasyMock.expect(exitLatch.await(500, TimeUnit.MILLISECONDS)).andAnswer(() -> {
+ when(exitLatch.await(500, TimeUnit.MILLISECONDS)).thenAnswer(i -> {
time.sleep(500);
return false;
});
- EasyMock.expect(exitLatch.await(0, TimeUnit.MILLISECONDS)).andReturn(false);
-
- replay(exitLatch);
+ when(exitLatch.await(0, TimeUnit.MILLISECONDS)).thenReturn(false);
retryWithToleranceOperator.backoff(1, 5000);
retryWithToleranceOperator.backoff(2, 5000);
@@ -353,23 +344,21 @@ public class RetryWithToleranceOperatorTest {
// that we don't wait with a negative timeout
retryWithToleranceOperator.backoff(6, 5000);
- PowerMock.verifyAll();
+ verifyNoMoreInteractions(exitLatch);
}
@Test
public void testToleranceLimit() {
- RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM);
- retryWithToleranceOperator.metrics(errorHandlingMetrics);
+ RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM, errorHandlingMetrics);
retryWithToleranceOperator.markAsFailed();
assertFalse("should not tolerate any errors", retryWithToleranceOperator.withinToleranceLimits());
- retryWithToleranceOperator = new RetryWithToleranceOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM);
- retryWithToleranceOperator.metrics(errorHandlingMetrics);
+ retryWithToleranceOperator = new RetryWithToleranceOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, errorHandlingMetrics);
retryWithToleranceOperator.markAsFailed();
retryWithToleranceOperator.markAsFailed();
assertTrue("should tolerate all errors", retryWithToleranceOperator.withinToleranceLimits());
- retryWithToleranceOperator = new RetryWithToleranceOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM);
+ retryWithToleranceOperator = new RetryWithToleranceOperator(ERRORS_RETRY_TIMEOUT_DEFAULT, ERRORS_RETRY_MAX_DELAY_DEFAULT, NONE, SYSTEM, errorHandlingMetrics);
assertTrue("no tolerance is within limits if no failures", retryWithToleranceOperator.withinToleranceLimits());
}
@@ -379,14 +368,12 @@ public class RetryWithToleranceOperatorTest {
assertEquals(configuration.errorRetryTimeout(), ERRORS_RETRY_TIMEOUT_DEFAULT);
assertEquals(configuration.errorMaxDelayInMillis(), ERRORS_RETRY_MAX_DELAY_DEFAULT);
assertEquals(configuration.errorToleranceType(), ERRORS_TOLERANCE_DEFAULT);
-
- PowerMock.verifyAll();
}
ConnectorConfig config(Map<String, String> connProps) {
Map<String, String> props = new HashMap<>();
props.put(ConnectorConfig.NAME_CONFIG, "test");
- props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SinkTask.class.getName());
+ props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, SinkConnector.class.getName());
props.putAll(connProps);
return new ConnectorConfig(plugins, props);
}
@@ -402,8 +389,6 @@ public class RetryWithToleranceOperatorTest {
configuration = config(singletonMap(ERRORS_TOLERANCE_CONFIG, "none"));
assertEquals(configuration.errorToleranceType(), ToleranceType.NONE);
-
- PowerMock.verifyAll();
}
@Test
@@ -414,9 +399,9 @@ public class RetryWithToleranceOperatorTest {
// can't corrupt the state of the ProcessingContext
AtomicReference<Throwable> failed = new AtomicReference<>(null);
RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(0,
- ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, new ProcessingContext() {
- private AtomicInteger count = new AtomicInteger();
- private AtomicInteger attempt = new AtomicInteger();
+ ERRORS_RETRY_MAX_DELAY_DEFAULT, ALL, SYSTEM, errorHandlingMetrics, new ProcessingContext() {
+ private final AtomicInteger count = new AtomicInteger();
+ private final AtomicInteger attempt = new AtomicInteger();
@Override
public void error(Throwable error) {
@@ -451,7 +436,6 @@ public class RetryWithToleranceOperatorTest {
super.attempt(attempt);
}
}, new CountDownLatch(1));
- retryWithToleranceOperator.metrics(errorHandlingMetrics);
ExecutorService pool = Executors.newFixedThreadPool(numThreads);
List<? extends Future<?>> futures = IntStream.range(0, numThreads).boxed()
@@ -494,18 +478,4 @@ public class RetryWithToleranceOperatorTest {
throw exception;
}
}
-
-
- private static class ExceptionThrower implements Operation<Object> {
- private Exception e;
-
- public ExceptionThrower(Exception e) {
- this.e = e;
- }
-
- @Override
- public Object call() throws Exception {
- throw e;
- }
- }
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporterTest.java
index 2d78297db5d..084bf9c57f4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporterTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/WorkerErrantRecordReporterTest.java
@@ -17,16 +17,18 @@
package org.apache.kafka.connect.runtime.errors;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.InternalSinkRecord;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
-import org.easymock.Mock;
-import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.modules.junit4.PowerMockRunner;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
import java.util.ArrayList;
import java.util.Collection;
@@ -34,39 +36,27 @@ import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
-
-@RunWith(PowerMockRunner.class)
-@PowerMockIgnore("javax.management.*")
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class WorkerErrantRecordReporterTest {
private WorkerErrantRecordReporter reporter;
- @Mock
- private RetryWithToleranceOperator retryWithToleranceOperator;
-
- @Mock
- private Converter converter;
-
- @Mock
- private HeaderConverter headerConverter;
-
- @Mock
- private SinkRecord record;
-
- @Before
- public void setup() {
- reporter = new WorkerErrantRecordReporter(
- retryWithToleranceOperator,
- converter,
- converter,
- headerConverter
- );
- }
+ @Mock private Converter converter;
+ @Mock private HeaderConverter headerConverter;
+ @Mock private InternalSinkRecord record;
+ @Mock private ErrorHandlingMetrics errorHandlingMetrics;
+ @Mock private ErrorReporter errorReporter;
@Test
public void testGetFutures() {
+ initializeReporter(true);
Collection<TopicPartition> topicPartitions = new ArrayList<>();
assertTrue(reporter.futures.isEmpty());
for (int i = 0; i < 4; i++) {
@@ -78,4 +68,46 @@ public class WorkerErrantRecordReporterTest {
reporter.awaitFutures(topicPartitions);
assertTrue(reporter.futures.isEmpty());
}
+
+ @Test
+ public void testReportErrorsTolerated() {
+ testReport(true);
+ }
+
+ @Test
+ public void testReportNoToleratedErrors() {
+ testReport(false);
+ }
+
+ private void testReport(boolean errorsTolerated) {
+ initializeReporter(errorsTolerated);
+ when(errorReporter.report(any())).thenReturn(CompletableFuture.completedFuture(null));
+ @SuppressWarnings("unchecked") ConsumerRecord<byte[], byte[]> consumerRecord = mock(ConsumerRecord.class);
+ when(record.originalRecord()).thenReturn(consumerRecord);
+
+ if (errorsTolerated) {
+ reporter.report(record, new Throwable());
+ } else {
+ assertThrows(ConnectException.class, () -> reporter.report(record, new Throwable()));
+ }
+
+ verify(errorReporter).report(any());
+ }
+
+ private void initializeReporter(boolean errorsTolerated) {
+ RetryWithToleranceOperator retryWithToleranceOperator = new RetryWithToleranceOperator(
+ 5000,
+ ConnectorConfig.ERRORS_RETRY_MAX_DELAY_DEFAULT,
+ errorsTolerated ? ToleranceType.ALL : ToleranceType.NONE,
+ Time.SYSTEM,
+ errorHandlingMetrics
+ );
+ retryWithToleranceOperator.reporters(Collections.singletonList(errorReporter));
+ reporter = new WorkerErrantRecordReporter(
+ retryWithToleranceOperator,
+ converter,
+ converter,
+ headerConverter
+ );
+ }
}