You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/04/03 20:00:18 UTC
[kafka] branch trunk updated: KAFKA-8126: Flaky Test
org.apache.kafka.connect.runtime.WorkerTest.testAddRemoveTask (#6475)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d3316bc KAFKA-8126: Flaky Test org.apache.kafka.connect.runtime.WorkerTest.testAddRemoveTask (#6475)
d3316bc is described below
commit d3316bc6a7834db9eef6dde21728891ba6b07b37
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Wed Apr 3 22:00:05 2019 +0200
KAFKA-8126: Flaky Test org.apache.kafka.connect.runtime.WorkerTest.testAddRemoveTask (#6475)
Changed the WorkerTest to use a mock Executor.
Author: Attila Doroszlai <ad...@apache.org>
Reviewer: Randall Hauch <rh...@gmail.com>
---
.../java/org/apache/kafka/connect/runtime/Worker.java | 15 +++++++++++++--
.../org/apache/kafka/connect/runtime/WorkerTest.java | 17 ++++++++---------
2 files changed, 21 insertions(+), 11 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 673bd4e..e867983 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -98,7 +98,6 @@ public class Worker {
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
private WorkerConfigTransformer workerConfigTransformer;
- @SuppressWarnings("deprecation")
public Worker(
String workerId,
Time time,
@@ -106,8 +105,20 @@ public class Worker {
WorkerConfig config,
OffsetBackingStore offsetBackingStore
) {
+ this(workerId, time, plugins, config, offsetBackingStore, Executors.newCachedThreadPool());
+ }
+
+ @SuppressWarnings("deprecation")
+ Worker(
+ String workerId,
+ Time time,
+ Plugins plugins,
+ WorkerConfig config,
+ OffsetBackingStore offsetBackingStore,
+ ExecutorService executorService
+ ) {
this.metrics = new ConnectMetrics(workerId, config, time);
- this.executor = Executors.newCachedThreadPool();
+ this.executor = executorService;
this.workerId = workerId;
this.time = time;
this.plugins = plugins;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 8f15c87..eef10f0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -71,6 +71,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import static org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest.NOOP_OPERATOR;
import static org.easymock.EasyMock.anyObject;
@@ -118,6 +119,7 @@ public class WorkerTest extends ThreadedTest {
@Mock private Converter taskKeyConverter;
@Mock private Converter taskValueConverter;
@Mock private HeaderConverter taskHeaderConverter;
+ @Mock private ExecutorService executorService;
@Before
public void setup() {
@@ -543,8 +545,7 @@ public class WorkerTest extends ThreadedTest {
expectTaskValueConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, taskValueConverter);
expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter);
- workerTask.run();
- EasyMock.expectLastCall();
+ EasyMock.expect(executorService.submit(workerTask)).andReturn(null);
EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
@@ -568,7 +569,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.replayAll();
- worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService);
worker.start();
assertStatistics(worker, 0, 0);
assertStartupStatistics(worker, 0, 0, 0, 0);
@@ -685,8 +686,7 @@ public class WorkerTest extends ThreadedTest {
expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
expectTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter);
- workerTask.run();
- EasyMock.expectLastCall();
+ EasyMock.expect(executorService.submit(workerTask)).andReturn(null);
EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
@@ -712,7 +712,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.replayAll();
- worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService);
worker.start();
assertStatistics(worker, 0, 0);
worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
@@ -778,8 +778,7 @@ public class WorkerTest extends ThreadedTest {
expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
expectTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter);
- workerTask.run();
- EasyMock.expectLastCall();
+ EasyMock.expect(executorService.submit(workerTask)).andReturn(null);
EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
@@ -803,7 +802,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.replayAll();
- worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService);
worker.start();
assertStatistics(worker, 0, 0);
assertEquals(Collections.emptySet(), worker.taskIds());