You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/04/03 20:15:18 UTC
[kafka] branch 2.1 updated: KAFKA-8126: Flaky Test
org.apache.kafka.connect.runtime.WorkerTest.testAddRemoveTask (#6475)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new f52d913 KAFKA-8126: Flaky Test org.apache.kafka.connect.runtime.WorkerTest.testAddRemoveTask (#6475)
f52d913 is described below
commit f52d913ddc88102808d81a07c6e81765d2e0c04c
Author: Doroszlai, Attila <64...@users.noreply.github.com>
AuthorDate: Wed Apr 3 22:00:05 2019 +0200
KAFKA-8126: Flaky Test org.apache.kafka.connect.runtime.WorkerTest.testAddRemoveTask (#6475)
Changed the WorkerTest to use a mock Executor.
Author: Attila Doroszlai <ad...@apache.org>
Reviewer: Randall Hauch <rh...@gmail.com>
---
.../java/org/apache/kafka/connect/runtime/Worker.java | 14 +++++++++++++-
.../org/apache/kafka/connect/runtime/WorkerTest.java | 17 ++++++++---------
2 files changed, 21 insertions(+), 10 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 1fd91d3..01d1a0b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -103,8 +103,20 @@ public class Worker {
WorkerConfig config,
OffsetBackingStore offsetBackingStore
) {
+ this(workerId, time, plugins, config, offsetBackingStore, Executors.newCachedThreadPool());
+ }
+
+ @SuppressWarnings("deprecation")
+ Worker(
+ String workerId,
+ Time time,
+ Plugins plugins,
+ WorkerConfig config,
+ OffsetBackingStore offsetBackingStore,
+ ExecutorService executorService
+ ) {
this.metrics = new ConnectMetrics(workerId, config, time);
- this.executor = Executors.newCachedThreadPool();
+ this.executor = executorService;
this.workerId = workerId;
this.time = time;
this.plugins = plugins;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 77238e9..06cdae8 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -69,6 +69,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import static org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperatorTest.NOOP_OPERATOR;
import static org.easymock.EasyMock.anyObject;
@@ -112,6 +113,7 @@ public class WorkerTest extends ThreadedTest {
@Mock private Converter taskKeyConverter;
@Mock private Converter taskValueConverter;
@Mock private HeaderConverter taskHeaderConverter;
+ @Mock private ExecutorService executorService;
@Before
public void setup() {
@@ -520,8 +522,7 @@ public class WorkerTest extends ThreadedTest {
expectTaskValueConverters(ClassLoaderUsage.CURRENT_CLASSLOADER, taskValueConverter);
expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, taskHeaderConverter);
- workerTask.run();
- EasyMock.expectLastCall();
+ EasyMock.expect(executorService.submit(workerTask)).andReturn(null);
EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
@@ -545,7 +546,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.replayAll();
- worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService);
worker.start();
assertStatistics(worker, 0, 0);
assertStartupStatistics(worker, 0, 0, 0, 0);
@@ -662,8 +663,7 @@ public class WorkerTest extends ThreadedTest {
expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
expectTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter);
- workerTask.run();
- EasyMock.expectLastCall();
+ EasyMock.expect(executorService.submit(workerTask)).andReturn(null);
EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
@@ -689,7 +689,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.replayAll();
- worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService);
worker.start();
assertStatistics(worker, 0, 0);
worker.startTask(TASK_ID, ClusterConfigState.EMPTY, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
@@ -755,8 +755,7 @@ public class WorkerTest extends ThreadedTest {
expectTaskHeaderConverter(ClassLoaderUsage.CURRENT_CLASSLOADER, null);
expectTaskHeaderConverter(ClassLoaderUsage.PLUGINS, taskHeaderConverter);
- workerTask.run();
- EasyMock.expectLastCall();
+ EasyMock.expect(executorService.submit(workerTask)).andReturn(null);
EasyMock.expect(plugins.delegatingLoader()).andReturn(delegatingLoader);
EasyMock.expect(delegatingLoader.connectorLoader(WorkerTestConnector.class.getName()))
@@ -780,7 +779,7 @@ public class WorkerTest extends ThreadedTest {
PowerMock.replayAll();
- worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), plugins, config, offsetBackingStore, executorService);
worker.start();
assertStatistics(worker, 0, 0);
assertEquals(Collections.emptySet(), worker.taskIds());