You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/12/01 22:42:30 UTC
kafka git commit: KAFKA-3008: Parallel start and stop of connectors
and tasks in Connect
Repository: kafka
Updated Branches:
refs/heads/trunk 88d8508b8 -> b65f9a777
KAFKA-3008: Parallel start and stop of connectors and tasks in Connect
Author: Konstantine Karantasis <ko...@confluent.io>
Author: Konstantine Karantasis <k....@gmail.com>
Reviewers: Jason Gustafson <ja...@confluent.io>, Shikhar Bhushan <sh...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1788 from kkonstantine/KAFKA-3008-Parallel-start-and-stop-of-connectors-and-tasks
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b65f9a77
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b65f9a77
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b65f9a77
Branch: refs/heads/trunk
Commit: b65f9a777d46fbe4edfed8a4c7216dd1e741be53
Parents: 88d8508
Author: Konstantine Karantasis <ko...@confluent.io>
Authored: Thu Dec 1 14:42:15 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Dec 1 14:42:15 2016 -0800
----------------------------------------------------------------------
.../runtime/SourceTaskOffsetCommitter.java | 104 +++++-----
.../apache/kafka/connect/runtime/Worker.java | 173 ++++++++++++-----
.../kafka/connect/runtime/WorkerTask.java | 2 +-
.../runtime/distributed/DistributedHerder.java | 178 +++++++++++++----
.../runtime/SourceTaskOffsetCommitterTest.java | 194 +++++++++++++++++++
.../kafka/connect/runtime/WorkerTest.java | 14 --
.../distributed/DistributedHerderTest.java | 59 +++---
.../standalone/StandaloneHerderTest.java | 12 +-
8 files changed, 539 insertions(+), 197 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
index c7f869e..acc2d0d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
@@ -22,8 +22,10 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -44,13 +46,22 @@ import java.util.concurrent.TimeUnit;
class SourceTaskOffsetCommitter {
private static final Logger log = LoggerFactory.getLogger(SourceTaskOffsetCommitter.class);
- private WorkerConfig config;
- private ScheduledExecutorService commitExecutorService = null;
- private final HashMap<ConnectorTaskId, ScheduledCommitTask> committers = new HashMap<>();
+ private final WorkerConfig config;
+ private final ScheduledExecutorService commitExecutorService;
+ private final ConcurrentMap<ConnectorTaskId, ScheduledFuture<?>> committers;
- SourceTaskOffsetCommitter(WorkerConfig config) {
+ // visible for testing
+ SourceTaskOffsetCommitter(WorkerConfig config,
+ ScheduledExecutorService commitExecutorService,
+ ConcurrentMap<ConnectorTaskId, ScheduledFuture<?>> committers) {
this.config = config;
- commitExecutorService = Executors.newSingleThreadScheduledExecutor();
+ this.commitExecutorService = commitExecutorService;
+ this.committers = committers;
+ }
+
+ public SourceTaskOffsetCommitter(WorkerConfig config) {
+ this(config, Executors.newSingleThreadScheduledExecutor(),
+ new ConcurrentHashMap<ConnectorTaskId, ScheduledFuture<?>>());
}
public void close(long timeoutMs) {
@@ -65,72 +76,45 @@ class SourceTaskOffsetCommitter {
}
public void schedule(final ConnectorTaskId id, final WorkerSourceTask workerTask) {
- synchronized (committers) {
- long commitIntervalMs = config.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
- ScheduledFuture<?> commitFuture = commitExecutorService.schedule(new Runnable() {
- @Override
- public void run() {
- commit(id, workerTask);
- }
- }, commitIntervalMs, TimeUnit.MILLISECONDS);
- committers.put(id, new ScheduledCommitTask(commitFuture));
- }
+ long commitIntervalMs = config.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+ ScheduledFuture<?> commitFuture = commitExecutorService.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ commit(workerTask);
+ }
+ }, commitIntervalMs, commitIntervalMs, TimeUnit.MILLISECONDS);
+ committers.put(id, commitFuture);
}
public void remove(ConnectorTaskId id) {
- final ScheduledCommitTask task;
- synchronized (committers) {
- task = committers.remove(id);
- task.cancelled = true;
- task.commitFuture.cancel(false);
- }
- if (task.finishedLatch != null) {
- try {
- task.finishedLatch.await();
- } catch (InterruptedException e) {
- throw new ConnectException("Unexpected interruption in SourceTaskOffsetCommitter.", e);
- }
- }
- }
+ final ScheduledFuture<?> task = committers.remove(id);
+ if (task == null)
+ return;
- private void commit(ConnectorTaskId id, WorkerSourceTask workerTask) {
- final ScheduledCommitTask task;
- synchronized (committers) {
- task = committers.get(id);
- if (task == null || task.cancelled)
- return;
- task.finishedLatch = new CountDownLatch(1);
+ try {
+ task.cancel(false);
+ if (!task.isDone())
+ task.get();
+ } catch (CancellationException e) {
+ // ignore
+ log.trace("Offset commit thread was cancelled by another thread while removing connector task with id: {}", id);
+ } catch (ExecutionException | InterruptedException e) {
+ throw new ConnectException("Unexpected interruption in SourceTaskOffsetCommitter while removing task with id: " + id, e);
}
+ }
+ private void commit(WorkerSourceTask workerTask) {
+ log.debug("Committing offsets for {}", workerTask);
try {
- log.debug("Committing offsets for {}", workerTask);
- boolean success = workerTask.commitOffsets();
- if (!success) {
- log.error("Failed to commit offsets for {}", workerTask);
+ if (workerTask.commitOffsets()) {
+ return;
}
+ log.error("Failed to commit offsets for {}", workerTask);
} catch (Throwable t) {
// We're very careful about exceptions here since any uncaught exceptions in the commit
// thread would cause the fixed interval schedule on the ExecutorService to stop running
// for that task
log.error("Unhandled exception when committing {}: ", workerTask, t);
- } finally {
- synchronized (committers) {
- task.finishedLatch.countDown();
- if (!task.cancelled)
- schedule(id, workerTask);
- }
- }
- }
-
- private static class ScheduledCommitTask {
- ScheduledFuture<?> commitFuture;
- boolean cancelled;
- CountDownLatch finishedLatch;
-
- ScheduledCommitTask(ScheduledFuture<?> commitFuture) {
- this.commitFuture = commitFuture;
- this.cancelled = false;
- this.finishedLatch = null;
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 1265f9e..c575d92 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -40,10 +40,11 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -72,8 +73,8 @@ public class Worker {
private final OffsetBackingStore offsetBackingStore;
private final Map<String, Object> producerProps;
- private HashMap<String, WorkerConnector> connectors = new HashMap<>();
- private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
+ private final ConcurrentMap<String, WorkerConnector> connectors = new ConcurrentHashMap<>();
+ private final ConcurrentMap<ConnectorTaskId, WorkerTask> tasks = new ConcurrentHashMap<>();
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
public Worker(String workerId, Time time, ConnectorFactory connectorFactory, WorkerConfig config, OffsetBackingStore offsetBackingStore) {
@@ -109,6 +110,9 @@ public class Worker {
producerProps.putAll(config.originalsWithPrefix("producer."));
}
+ /**
+ * Start worker.
+ */
public void start() {
log.info("Worker starting");
@@ -118,6 +122,9 @@ public class Worker {
log.info("Worker started");
}
+ /**
+ * Stop worker.
+ */
public void stop() {
log.info("Worker stopping");
@@ -142,6 +149,16 @@ public class Worker {
log.info("Worker stopped");
}
+ /**
+ * Start a connector managed by this worker.
+ *
+ * @param connName the connector name.
+ * @param connProps the properties of the connector.
+ * @param ctx the connector runtime context.
+ * @param statusListener a listener for the runtime status transitions of the connector.
+ * @param initialState the initial state of the connector.
+ * @return true if the connector started successfully.
+ */
public boolean startConnector(
String connName,
Map<String, String> connProps,
@@ -168,18 +185,36 @@ public class Worker {
return false;
}
- connectors.put(connName, workerConnector);
+ WorkerConnector existing = connectors.putIfAbsent(connName, workerConnector);
+ if (existing != null)
+ throw new ConnectException("Connector with name " + connName + " already exists");
log.info("Finished creating connector {}", connName);
return true;
}
- /* Now that the configuration doesn't contain the actual class name, we need to be able to tell the herder whether a connector is a Sink */
+ /**
+ * Return true if the connector associated with this worker is a sink connector.
+ *
+ * @param connName the connector name.
+ * @return true if the connector belongs to the worker and is a sink connector.
+ * @throws ConnectException if the worker does not manage a connector with the given name.
+ */
public boolean isSinkConnector(String connName) {
WorkerConnector workerConnector = connectors.get(connName);
+ if (workerConnector == null)
+ throw new ConnectException("Connector " + connName + " not found in this worker.");
return workerConnector.isSinkConnector();
}
+ /**
+ * Get a list of updated task properties for the tasks of this connector.
+ *
+ * @param connName the connector name.
+ * @param maxTasks the maxinum number of tasks.
+ * @param sinkTopics a list of sink topics.
+ * @return a list of updated tasks properties.
+ */
public List<Map<String, String>> connectorTaskConfigs(String connName, int maxTasks, List<String> sinkTopics) {
log.trace("Reconfiguring connector tasks for {}", connName);
@@ -200,31 +235,29 @@ public class Worker {
return result;
}
- public void stopConnectors() {
- stopConnectors(new HashSet<>(connectors.keySet()));
- }
-
- public Collection<String> stopConnectors(Collection<String> connectors) {
- final List<String> stopped = new ArrayList<>(connectors.size());
- for (String connector: connectors) {
- if (stopConnector(connector)) {
- stopped.add(connector);
- }
- }
- return stopped;
+ private void stopConnectors() {
+ // Herder is responsible for stopping connectors. This is an internal method to sequentially
+ // stop connectors that have not explicitly been stopped.
+ for (String connector: connectors.keySet())
+ stopConnector(connector);
}
+ /**
+ * Stop a connector managed by this worker.
+ *
+ * @param connName the connector name.
+ * @return true if the connector belonged to this worker and was successfully stopped.
+ */
public boolean stopConnector(String connName) {
log.info("Stopping connector {}", connName);
- WorkerConnector connector = connectors.get(connName);
+ WorkerConnector connector = connectors.remove(connName);
if (connector == null) {
log.warn("Ignoring stop request for unowned connector {}", connName);
return false;
}
connector.shutdown();
- connectors.remove(connName);
log.info("Stopped connector {}", connName);
return true;
@@ -232,16 +265,34 @@ public class Worker {
/**
* Get the IDs of the connectors currently running in this worker.
+ *
+ * @return the set of connector IDs.
*/
public Set<String> connectorNames() {
return connectors.keySet();
}
+ /**
+ * Return true if a connector with the given name is managed by this worker and is currently running.
+ *
+ * @param connName the connector name.
+ * @return true if the connector is running, false if the connector is not running or is not manages by this worker.
+ */
public boolean isRunning(String connName) {
WorkerConnector connector = connectors.get(connName);
return connector != null && connector.isRunning();
}
+ /**
+ * Start a task managed by this worker.
+ *
+ * @param id the task ID.
+ * @param connProps the connector properties.
+ * @param taskProps the tasks properties.
+ * @param statusListener a listener for the runtime status transitions of the task.
+ * @param initialState the initial state of the connector.
+ * @return true if the task started successfully.
+ */
public boolean startTask(
ConnectorTaskId id,
Map<String, String> connProps,
@@ -282,11 +333,14 @@ public class Worker {
return false;
}
+ WorkerTask existing = tasks.putIfAbsent(id, workerTask);
+ if (existing != null)
+ throw new ConnectException("Task already exists in this worker: " + id);
+
executor.submit(workerTask);
if (workerTask instanceof WorkerSourceTask) {
sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) workerTask);
}
- tasks.put(id, workerTask);
return true;
}
@@ -314,34 +368,38 @@ public class Worker {
}
}
- public boolean stopAndAwaitTask(ConnectorTaskId id) {
- return !stopAndAwaitTasks(Collections.singleton(id)).isEmpty();
- }
+ private void stopTask(ConnectorTaskId taskId) {
+ WorkerTask task = tasks.get(taskId);
+ if (task == null) {
+ log.warn("Ignoring stop request for unowned task {}", taskId);
+ return;
+ }
- public void stopAndAwaitTasks() {
- stopAndAwaitTasks(new HashSet<>(tasks.keySet()));
+ log.info("Stopping task {}", task.id());
+ if (task instanceof WorkerSourceTask)
+ sourceTaskOffsetCommitter.remove(task.id());
+ task.stop();
}
- public Collection<ConnectorTaskId> stopAndAwaitTasks(Collection<ConnectorTaskId> ids) {
- final List<ConnectorTaskId> stoppable = new ArrayList<>(ids.size());
+ private void stopTasks(Collection<ConnectorTaskId> ids) {
+ // Herder is responsible for stopping tasks. This is an internal method to sequentially
+ // stop the tasks that have not explicitly been stopped.
for (ConnectorTaskId taskId : ids) {
- final WorkerTask task = tasks.get(taskId);
- if (task == null) {
- log.warn("Ignoring stop request for unowned task {}", taskId);
- continue;
- }
- stopTask(task);
- stoppable.add(taskId);
+ stopTask(taskId);
}
- awaitStopTasks(stoppable);
- return stoppable;
}
- private void stopTask(WorkerTask task) {
- log.info("Stopping task {}", task.id());
- if (task instanceof WorkerSourceTask)
- sourceTaskOffsetCommitter.remove(task.id());
- task.stop();
+ private void awaitStopTask(ConnectorTaskId taskId, long timeout) {
+ WorkerTask task = tasks.remove(taskId);
+ if (task == null) {
+ log.warn("Ignoring await stop request for non-present task {}", taskId);
+ return;
+ }
+
+ if (!task.awaitStop(timeout)) {
+ log.error("Graceful stop of task {} failed.", task.id());
+ task.cancel();
+ }
}
private void awaitStopTasks(Collection<ConnectorTaskId> ids) {
@@ -349,16 +407,35 @@ public class Worker {
long deadline = now + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
for (ConnectorTaskId id : ids) {
long remaining = Math.max(0, deadline - time.milliseconds());
- awaitStopTask(tasks.get(id), remaining);
+ awaitStopTask(id, remaining);
}
}
- private void awaitStopTask(WorkerTask task, long timeout) {
- if (!task.awaitStop(timeout)) {
- log.error("Graceful stop of task {} failed.", task.id());
- task.cancel();
- }
- tasks.remove(task.id());
+ /**
+ * Stop asynchronously all the worker's tasks and await their termination.
+ */
+ public void stopAndAwaitTasks() {
+ stopAndAwaitTasks(new ArrayList<>(tasks.keySet()));
+ }
+
+ /**
+ * Stop asynchronously a collection of tasks that belong to this worker and await their termination.
+ *
+ * @param ids the collection of tasks to be stopped.
+ */
+ public void stopAndAwaitTasks(Collection<ConnectorTaskId> ids) {
+ stopTasks(ids);
+ awaitStopTasks(ids);
+ }
+
+ /**
+ * Stop a task that belongs to this worker and await its termination.
+ *
+ * @param taskId the ID of the task to be stopped.
+ */
+ public void stopAndAwaitTask(ConnectorTaskId taskId) {
+ stopTask(taskId);
+ awaitStopTasks(Collections.singletonList(taskId));
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 846ca95..2f2ebb5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -64,7 +64,7 @@ abstract class WorkerTask implements Runnable {
/**
* Initialize the task for execution.
- * @param props initial configuration
+ * @param taskConfig initial configuration
*/
public abstract void initialize(TaskConfig taskConfig);
http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 170c983..ce2e72a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -54,10 +54,11 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -98,6 +99,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class);
private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
+ private static final int START_STOP_THREAD_POOL_SIZE = 8;
private final Time time;
@@ -106,6 +108,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
private final int workerUnsyncBackoffMs;
private final ExecutorService forwardRequestExecutor;
+ private final ExecutorService startAndStopExecutor;
private final WorkerGroupMember member;
private final AtomicBoolean stopping;
private final CountDownLatch stopLatch = new CountDownLatch(1);
@@ -119,7 +122,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
// To handle most external requests, like creating or destroying a connector, we can use a generic request where
// the caller specifies all the code that should be executed.
- private final Queue<HerderRequest> requests = new PriorityQueue<>();
+ private final NavigableSet<HerderRequest> requests = new ConcurrentSkipListSet<>();
// Config updates can be collected and applied together when possible. Also, we need to take care to rebalance when
// needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits).
private Set<String> connectorConfigUpdates = new HashSet<>();
@@ -144,11 +147,11 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
Worker worker,
String workerId,
StatusBackingStore statusBackingStore,
- ConfigBackingStore configStorage,
+ ConfigBackingStore configBackingStore,
WorkerGroupMember member,
String restUrl,
Time time) {
- super(worker, workerId, statusBackingStore, configStorage);
+ super(worker, workerId, statusBackingStore, configBackingStore);
this.time = time;
this.workerGroupId = config.getString(DistributedConfig.GROUP_ID_CONFIG);
@@ -156,6 +159,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
this.workerUnsyncBackoffMs = config.getInt(DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG);
this.member = member != null ? member : new WorkerGroupMember(config, restUrl, this.configBackingStore, new RebalanceListener(), time);
this.forwardRequestExecutor = Executors.newSingleThreadExecutor();
+ this.startAndStopExecutor = Executors.newFixedThreadPool(START_STOP_THREAD_POOL_SIZE);
stopping = new AtomicBoolean(false);
configState = ClusterConfigState.EMPTY;
@@ -222,17 +226,14 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
final long now = time.milliseconds();
long nextRequestTimeoutMs = Long.MAX_VALUE;
while (true) {
- final HerderRequest next;
- synchronized (this) {
- next = requests.peek();
- if (next == null) {
- break;
- } else if (now >= next.at) {
- requests.poll();
- } else {
- nextRequestTimeoutMs = next.at - now;
- break;
- }
+ final HerderRequest next = peekWithoutException();
+ if (next == null) {
+ break;
+ } else if (now >= next.at) {
+ requests.pollFirst();
+ } else {
+ nextRequestTimeoutMs = next.at - now;
+ break;
}
try {
@@ -292,7 +293,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
try {
member.poll(nextRequestTimeoutMs);
// Ensure we're in a good state in our group. If not restart and everything should be setup to rejoin
- if (!handleRebalanceCompleted()) return;
+ handleRebalanceCompleted();
} catch (WakeupException e) { // FIXME should not be WakeupException
// Ignore. Just indicates we need to check the exit flag, for requested actions, etc.
}
@@ -338,17 +339,24 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
public void halt() {
synchronized (this) {
// Clean up any connectors and tasks that are still running.
- log.info("Stopping connectors and tasks that are still assigned to the worker");
- worker.stopConnectors();
- worker.stopAndAwaitTasks();
+ log.info("Stopping connectors and tasks that are still assigned to this worker.");
+ List<Callable<Void>> callables = new ArrayList<>();
+ for (String connectorName : new ArrayList<>(worker.connectorNames())) {
+ callables.add(getConnectorStoppingCallable(connectorName));
+ }
+ for (ConnectorTaskId taskId : new ArrayList<>(worker.taskIds())) {
+ callables.add(getTaskStoppingCallable(taskId));
+ }
+ startAndStop(callables);
member.stop();
- // Explicitly fail any outstanding requests so they actually get a response and get an understandable reason
- // for their failure
- while (!requests.isEmpty()) {
- HerderRequest request = requests.poll();
+ // Explicitly fail any outstanding requests so they actually get a response and get an
+ // understandable reason for their failure.
+ HerderRequest request = requests.pollFirst();
+ while (request != null) {
request.callback().onCompletion(new ConnectException("Worker is shutting down"), null);
+ request = requests.pollFirst();
}
stopServices();
@@ -370,9 +378,12 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
forwardRequestExecutor.shutdown();
+ startAndStopExecutor.shutdown();
try {
if (!forwardRequestExecutor.awaitTermination(10000, TimeUnit.MILLISECONDS))
forwardRequestExecutor.shutdownNow();
+ if (!startAndStopExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS))
+ startAndStopExecutor.shutdownNow();
} catch (InterruptedException e) {
// ignore
}
@@ -381,7 +392,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
@Override
- public synchronized void connectors(final Callback<Collection<String>> callback) {
+ public void connectors(final Callback<Collection<String>> callback) {
log.trace("Submitting connector listing request");
addRequest(
@@ -400,7 +411,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
@Override
- public synchronized void connectorInfo(final String connName, final Callback<ConnectorInfo> callback) {
+ public void connectorInfo(final String connName, final Callback<ConnectorInfo> callback) {
log.trace("Submitting connector info request {}", connName);
addRequest(
@@ -523,7 +534,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
@Override
- public synchronized void requestTaskReconfiguration(final String connName) {
+ public void requestTaskReconfiguration(final String connName) {
log.trace("Submitting connector task reconfiguration request {}", connName);
addRequest(
@@ -547,7 +558,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
@Override
- public synchronized void taskConfigs(final String connName, final Callback<List<TaskInfo>> callback) {
+ public void taskConfigs(final String connName, final Callback<List<TaskInfo>> callback) {
log.trace("Submitting get task configuration request {}", connName);
addRequest(
@@ -575,7 +586,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
@Override
- public synchronized void putTaskConfigs(final String connName, final List<Map<String, String>> configs, final Callback<Void> callback) {
+ public void putTaskConfigs(final String connName, final List<Map<String, String>> configs, final Callback<Void> callback) {
log.trace("Submitting put task configuration request {}", connName);
addRequest(
@@ -598,7 +609,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
@Override
- public synchronized void restartConnector(final String connName, final Callback<Void> callback) {
+ public void restartConnector(final String connName, final Callback<Void> callback) {
addRequest(new Callable<Void>() {
@Override
public Void call() throws Exception {
@@ -631,7 +642,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
@Override
- public synchronized void restartTask(final ConnectorTaskId id, final Callback<Void> callback) {
+ public void restartTask(final ConnectorTaskId id, final Callback<Void> callback) {
addRequest(new Callable<Void>() {
@Override
public Void call() throws Exception {
@@ -788,15 +799,26 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
Utils.sleep(ms);
}
+ private void startAndStop(Collection<Callable<Void>> callables) {
+ try {
+ startAndStopExecutor.invokeAll(callables);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+
private void startWork() {
// Start assigned connectors and tasks
log.info("Starting connectors and tasks using config offset {}", assignment.offset());
+ List<Callable<Void>> callables = new ArrayList<>();
for (String connectorName : assignment.connectors()) {
- startConnector(connectorName);
+ callables.add(getConnectorStartingCallable(connectorName));
}
+
for (ConnectorTaskId taskId : assignment.tasks()) {
- startTask(taskId);
+ callables.add(getTaskStartingCallable(taskId));
}
+ startAndStop(callables);
log.info("Finished starting connectors and tasks");
}
@@ -811,12 +833,38 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
);
}
+ private Callable<Void> getTaskStartingCallable(final ConnectorTaskId taskId) {
+ return new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ try {
+ startTask(taskId);
+ } catch (Throwable t) {
+ log.error("Couldn't instantiate task {} because it has an invalid task configuration. This task will not execute until reconfigured.",
+ taskId, t);
+ onFailure(taskId, t);
+ }
+ return null;
+ }
+ };
+ }
+
+ private Callable<Void> getTaskStoppingCallable(final ConnectorTaskId taskId) {
+ return new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ worker.stopAndAwaitTask(taskId);
+ return null;
+ }
+ };
+ }
+
// Helper for starting a connector with the given name, which will extract & parse the config, generate connector
// context and add to the worker. This needs to be called from within the main worker thread for this herder.
private boolean startConnector(String connectorName) {
log.info("Starting connector {}", connectorName);
final Map<String, String> configProps = configState.connectorConfig(connectorName);
- final ConnectorContext ctx = new HerderConnectorContext(DistributedHerder.this, connectorName);
+ final ConnectorContext ctx = new HerderConnectorContext(this, connectorName);
final TargetState initialState = configState.targetState(connectorName);
boolean started = worker.startConnector(connectorName, configProps, ctx, this, initialState);
@@ -829,6 +877,36 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
return started;
}
+ private Callable<Void> getConnectorStartingCallable(final String connectorName) {
+ return new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ try {
+ startConnector(connectorName);
+ } catch (Throwable t) {
+ log.error("Couldn't instantiate connector " + connectorName + " because it has an invalid connector " +
+ "configuration. This connector will not execute until reconfigured.", t);
+ onFailure(connectorName, t);
+ }
+ return null;
+ }
+ };
+ }
+
+ private Callable<Void> getConnectorStoppingCallable(final String connectorName) {
+ return new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ try {
+ worker.stopConnector(connectorName);
+ } catch (Throwable t) {
+ log.error("Failed to shut down connector " + connectorName, t);
+ }
+ return null;
+ }
+ };
+ }
+
private void reconfigureConnectorTasksWithRetry(final String connName) {
reconfigureConnector(connName, new Callback<Void>() {
@Override
@@ -941,10 +1019,20 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
private void addRequest(long delayMs, Callable<Void> action, Callback<Void> callback) {
HerderRequest req = new HerderRequest(time.milliseconds() + delayMs, action, callback);
requests.add(req);
- if (requests.peek() == req)
+ if (peekWithoutException() == req)
member.wakeup();
}
+ private HerderRequest peekWithoutException() {
+ try {
+ return requests.isEmpty() ? null : requests.first();
+ } catch (NoSuchElementException e) {
+ // Ignore exception. Should be rare. Means that the collection became empty between
+ // checking the size and retrieving the first element.
+ }
+ return null;
+ }
+
public class ConfigUpdateListener implements ConfigBackingStore.UpdateListener {
@Override
public void onConnectorConfigRemove(String connector) {
@@ -999,7 +1087,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
}
}
- private class HerderRequest implements Comparable<HerderRequest> {
+ private static class HerderRequest implements Comparable<HerderRequest> {
private final long at;
private final Callable<Void> action;
private final Callback<Void> callback;
@@ -1020,7 +1108,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
@Override
public int compareTo(HerderRequest o) {
- return Long.compare(at, o.at);
+ final int soonest = Long.compare(at, o.at);
+ // If tied, returning a positive value should respect insertion order.
+ return soonest != 0 ? soonest : 1;
}
}
@@ -1081,19 +1171,25 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
// it is still important to have a leader that can write configs, offsets, etc.
if (rebalanceResolved) {
- // TODO: Parallelize this. We should be able to request all connectors and tasks to stop, then wait on all of
- // them to finish
// TODO: Technically we don't have to stop connectors at all until we know they've really been removed from
// this worker. Instead, we can let them continue to run but buffer any update requests (which should be
// rare anyway). This would avoid a steady stream of start/stop, which probably also includes lots of
// unnecessary repeated connections to the source/sink system.
- worker.stopConnectors(connectors);
+ List<Callable<Void>> callables = new ArrayList<>();
+ for (final String connectorName : connectors) {
+ callables.add(getConnectorStoppingCallable(connectorName));
+ }
// TODO: We need to at least commit task offsets, but if we could commit offsets & pause them instead of
// stopping them then state could continue to be reused when the task remains on this worker. For example,
// this would avoid having to close a connection and then reopen it when the task is assigned back to this
// worker again.
- worker.stopAndAwaitTasks(tasks);
+ for (final ConnectorTaskId taskId : tasks) {
+ callables.add(getTaskStoppingCallable(taskId));
+ }
+
+ // The actual timeout for graceful task stop is applied in worker's stopAndAwaitTask method.
+ startAndStop(callables);
// Ensure that all status updates have been pushed to the storage system before rebalancing.
// Otherwise, we may inadvertently overwrite the state with a stale value after the rebalance
http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
new file mode 100644
index 0000000..45125cc
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.ThreadedTest;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.easymock.EasyMock.eq;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({SourceTaskOffsetCommitter.class, LoggerFactory.class})
+public class SourceTaskOffsetCommitterTest extends ThreadedTest {
+ @Mock
+ private ScheduledExecutorService executor;
+ @Mock
+ private ConcurrentHashMap committers;
+ @Mock
+ private Logger mockLog;
+
+ private SourceTaskOffsetCommitter committer;
+
+ private static final long DEFAULT_OFFSET_COMMIT_INTERVAL_MS = 1000;
+
+ @Override
+ public void setup() {
+ super.setup();
+ Map<String, String> workerProps = new HashMap<>();
+ workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
+ workerProps.put("internal.key.converter.schemas.enable", "false");
+ workerProps.put("internal.value.converter.schemas.enable", "false");
+ workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
+ workerProps.put("offset.flush.interval.ms",
+ Long.toString(DEFAULT_OFFSET_COMMIT_INTERVAL_MS));
+ WorkerConfig config = new StandaloneConfig(workerProps);
+ committer = new SourceTaskOffsetCommitter(config, executor, committers);
+ Whitebox.setInternalState(SourceTaskOffsetCommitter.class, "log", mockLog);
+ }
+
+ @Test
+ public void testSchedule() throws Exception {
+ Capture<Runnable> taskWrapper = EasyMock.newCapture();
+
+ ScheduledFuture commitFuture = PowerMock.createMock(ScheduledFuture.class);
+ EasyMock.expect(executor.scheduleWithFixedDelay(
+ EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
+ eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS))
+ ).andReturn(commitFuture);
+
+ ConnectorTaskId taskId = PowerMock.createMock(ConnectorTaskId.class);
+ WorkerSourceTask task = PowerMock.createMock(WorkerSourceTask.class);
+
+ EasyMock.expect(committers.put(taskId, commitFuture)).andReturn(null);
+
+ PowerMock.replayAll();
+
+ committer.schedule(taskId, task);
+ assertTrue(taskWrapper.hasCaptured());
+ assertNotNull(taskWrapper.getValue());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ long timeoutMs = 1000;
+
+ // Normal termination, where termination times out.
+ executor.shutdown();
+ PowerMock.expectLastCall();
+
+ EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS)))
+ .andReturn(false);
+ mockLog.error(EasyMock.anyString());
+ PowerMock.expectLastCall();
+ PowerMock.replayAll();
+
+ committer.close(timeoutMs);
+
+ PowerMock.verifyAll();
+ PowerMock.resetAll();
+
+ // Termination interrupted
+ executor.shutdown();
+ PowerMock.expectLastCall();
+
+ EasyMock.expect(executor.awaitTermination(eq(timeoutMs), eq(TimeUnit.MILLISECONDS)))
+ .andThrow(new InterruptedException());
+ PowerMock.replayAll();
+
+ committer.close(timeoutMs);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testRemove() throws Exception {
+ ConnectorTaskId taskId = PowerMock.createMock(ConnectorTaskId.class);
+ ScheduledFuture task = PowerMock.createMock(ScheduledFuture.class);
+
+ // Try to remove a non-existing task
+ EasyMock.expect(committers.remove(taskId)).andReturn(null);
+ PowerMock.replayAll();
+
+ committer.remove(taskId);
+
+ PowerMock.verifyAll();
+ PowerMock.resetAll();
+
+ // Try to remove an existing task
+ EasyMock.expect(committers.remove(taskId)).andReturn(task);
+ EasyMock.expect(task.cancel(eq(false))).andReturn(false);
+ EasyMock.expect(task.isDone()).andReturn(false);
+ EasyMock.expect(task.get()).andReturn(null);
+ PowerMock.replayAll();
+
+ committer.remove(taskId);
+
+ PowerMock.verifyAll();
+ PowerMock.resetAll();
+
+ // Try to remove a cancelled task
+ EasyMock.expect(committers.remove(taskId)).andReturn(task);
+ EasyMock.expect(task.cancel(eq(false))).andReturn(false);
+ EasyMock.expect(task.isDone()).andReturn(false);
+ EasyMock.expect(task.get()).andThrow(new CancellationException());
+ mockLog.trace(EasyMock.anyString(), EasyMock.anyObject());
+ PowerMock.expectLastCall();
+ PowerMock.replayAll();
+
+ committer.remove(taskId);
+
+ PowerMock.verifyAll();
+ PowerMock.resetAll();
+
+ // Try to remove an interrupted task
+ EasyMock.expect(committers.remove(taskId)).andReturn(task);
+ EasyMock.expect(task.cancel(eq(false))).andReturn(false);
+ EasyMock.expect(task.isDone()).andReturn(false);
+ EasyMock.expect(task.get()).andThrow(new InterruptedException());
+ PowerMock.replayAll();
+
+ try {
+ committer.remove(taskId);
+ fail("Expected ConnectException to be raised");
+ } catch (ConnectException e) {
+ //ignore
+ }
+
+ PowerMock.verifyAll();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 97e29be..eac0520 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -425,20 +425,6 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expectLastCall();
assertEquals(Collections.emptySet(), worker.taskIds());
-
- assertFalse(worker.stopAndAwaitTask(TASK_ID));
- }
-
- @Test
- public void testStopInvalidTask() {
- expectStartStorage();
-
- PowerMock.replayAll();
-
- worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
- worker.start();
-
- assertFalse(worker.stopAndAwaitTask(TASK_ID));
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 1da4595..5be2044 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -41,7 +41,6 @@ import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.ConfigBackingStore;
-import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -145,7 +144,7 @@ public class DistributedHerderTest {
private static final String WORKER_ID = "localhost:8083";
- @Mock private KafkaConfigBackingStore configStorage;
+ @Mock private ConfigBackingStore configBackingStore;
@Mock private StatusBackingStore statusBackingStore;
@Mock private WorkerGroupMember member;
private MockTime time;
@@ -163,7 +162,7 @@ public class DistributedHerderTest {
time = new MockTime();
herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff", "updateDeletedConnectorStatus"},
- new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, statusBackingStore, configStorage, member, MEMBER_URL, time);
+ new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, statusBackingStore, configBackingStore, member, MEMBER_URL, time);
configUpdateListener = herder.new ConfigUpdateListener();
rebalanceListener = herder.new RebalanceListener();
@@ -274,13 +273,15 @@ public class DistributedHerderTest {
@Test
public void testHaltCleansUpWorker() {
- worker.stopConnectors();
- PowerMock.expectLastCall();
- worker.stopAndAwaitTasks();
+ EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONN1));
+ worker.stopConnector(CONN1);
+ PowerMock.expectLastCall().andReturn(true);
+ EasyMock.expect(worker.taskIds()).andReturn(Collections.singleton(TASK1));
+ worker.stopAndAwaitTask(TASK1);
PowerMock.expectLastCall();
member.stop();
PowerMock.expectLastCall();
- configStorage.stop();
+ configBackingStore.stop();
PowerMock.expectLastCall();
statusBackingStore.stop();
PowerMock.expectLastCall();
@@ -312,7 +313,7 @@ public class DistributedHerderTest {
EasyMock.expect(connectorMock.validate(CONN2_CONFIG)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
// CONN2 is new, should succeed
- configStorage.putConnectorConfig(CONN2, CONN2_CONFIG);
+ configBackingStore.putConnectorConfig(CONN2, CONN2_CONFIG);
PowerMock.expectLastCall();
ConnectorInfo info = new ConnectorInfo(CONN2, CONN2_CONFIG, Collections.<ConnectorTaskId>emptyList());
putConnectorCallback.onCompletion(null, new Herder.Created<>(true, info));
@@ -494,7 +495,7 @@ public class DistributedHerderTest {
// And delete the connector
member.wakeup();
PowerMock.expectLastCall();
- configStorage.removeConnectorConfig(CONN1);
+ configBackingStore.removeConnectorConfig(CONN1);
PowerMock.expectLastCall();
putConnectorCallback.onCompletion(null, new Herder.Created<ConnectorInfo>(false, null));
PowerMock.expectLastCall();
@@ -681,7 +682,7 @@ public class DistributedHerderTest {
PowerMock.expectLastCall();
worker.stopAndAwaitTask(TASK0);
- PowerMock.expectLastCall().andReturn(true);
+ PowerMock.expectLastCall();
worker.startTask(EasyMock.eq(TASK0), EasyMock.<Map<String, String>>anyObject(), EasyMock.<Map<String, String>>anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
@@ -817,7 +818,7 @@ public class DistributedHerderTest {
member.ensureActive();
PowerMock.expectLastCall();
// Checks for config updates and starts rebalance
- EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
+ EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT);
member.requestRejoin();
PowerMock.expectLastCall();
// Performs rebalance and gets new assignment
@@ -863,7 +864,7 @@ public class DistributedHerderTest {
member.wakeup();
member.ensureActive();
PowerMock.expectLastCall();
- EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); // for this test, it doesn't matter if we use the same config snapshot
+ EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT); // for this test, it doesn't matter if we use the same config snapshot
worker.stopConnector(CONN1);
PowerMock.expectLastCall().andReturn(true);
worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
@@ -906,7 +907,7 @@ public class DistributedHerderTest {
member.ensureActive();
PowerMock.expectLastCall();
- EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1);
+ EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1);
PowerMock.expectLastCall();
worker.setTargetState(CONN1, TargetState.PAUSED);
@@ -944,7 +945,7 @@ public class DistributedHerderTest {
member.ensureActive();
PowerMock.expectLastCall();
- EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
+ EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT);
PowerMock.expectLastCall();
// we expect reconfiguration after resuming
@@ -985,7 +986,7 @@ public class DistributedHerderTest {
member.ensureActive();
PowerMock.expectLastCall();
- EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
+ EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT);
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
@@ -1022,7 +1023,7 @@ public class DistributedHerderTest {
member.ensureActive();
PowerMock.expectLastCall();
- EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1);
+ EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_PAUSED_CONN1);
PowerMock.expectLastCall();
worker.setTargetState(CONN1, TargetState.PAUSED);
@@ -1062,7 +1063,7 @@ public class DistributedHerderTest {
member.ensureActive();
PowerMock.expectLastCall();
- EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
+ EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT);
PowerMock.expectLastCall();
worker.setTargetState(CONN1, TargetState.STARTED);
@@ -1097,7 +1098,7 @@ public class DistributedHerderTest {
member.ensureActive();
PowerMock.expectLastCall();
// Checks for config updates and starts rebalance
- EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT);
+ EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT);
member.requestRejoin();
PowerMock.expectLastCall();
// Performs rebalance and gets new assignment
@@ -1128,7 +1129,7 @@ public class DistributedHerderTest {
ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(),
Collections.<ConnectorTaskId>emptyList());
// Reading to end of log times out
- configStorage.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+ configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
EasyMock.expectLastCall().andThrow(new TimeoutException());
member.maybeLeaveGroup();
EasyMock.expectLastCall();
@@ -1230,7 +1231,7 @@ public class DistributedHerderTest {
EasyMock.expect(connectorMock.config()).andReturn(new ConfigDef());
EasyMock.expect(connectorMock.validate(CONN1_CONFIG_UPDATED)).andReturn(new Config(Collections.<ConfigValue>emptyList()));
- configStorage.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED);
+ configBackingStore.putConnectorConfig(CONN1, CONN1_CONFIG_UPDATED);
PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
public Object answer() throws Throwable {
@@ -1241,7 +1242,7 @@ public class DistributedHerderTest {
});
// As a result of reconfig, should need to update snapshot. With only connector updates, we'll just restart
// connector without rebalance
- EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG);
+ EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG);
worker.stopConnector(CONN1);
PowerMock.expectLastCall().andReturn(true);
worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
@@ -1318,13 +1319,15 @@ public class DistributedHerderTest {
});
if (revokedConnectors != null) {
- worker.stopConnectors(revokedConnectors);
- PowerMock.expectLastCall().andReturn(revokedConnectors);
+ for (String connector : revokedConnectors) {
+ worker.stopConnector(connector);
+ PowerMock.expectLastCall().andReturn(true);
+ }
}
- if (revokedTasks != null) {
- worker.stopAndAwaitTasks(revokedTasks);
- PowerMock.expectLastCall().andReturn(revokedTasks);
+ if (revokedTasks != null && !revokedTasks.isEmpty()) {
+ worker.stopAndAwaitTask(EasyMock.anyObject(ConnectorTaskId.class));
+ PowerMock.expectLastCall();
}
if (revokedConnectors != null) {
@@ -1337,9 +1340,9 @@ public class DistributedHerderTest {
}
private void expectPostRebalanceCatchup(final ClusterConfigState readToEndSnapshot) throws TimeoutException {
- configStorage.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+ configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
EasyMock.expectLastCall();
- EasyMock.expect(configStorage.snapshot()).andReturn(readToEndSnapshot);
+ EasyMock.expect(configBackingStore.snapshot()).andReturn(readToEndSnapshot);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b65f9a77/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 0bc3d5c..010c0b2 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -66,7 +66,9 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import static java.util.Collections.singleton;
import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -296,7 +298,7 @@ public class StandaloneHerderTest {
expectConfigValidation(connectorConfig);
worker.stopAndAwaitTask(taskId);
- EasyMock.expectLastCall().andReturn(true);
+ EasyMock.expectLastCall();
worker.startTask(taskId, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
EasyMock.expectLastCall().andReturn(true);
@@ -321,7 +323,7 @@ public class StandaloneHerderTest {
expectConfigValidation(connectorConfig);
worker.stopAndAwaitTask(taskId);
- EasyMock.expectLastCall().andReturn(true);
+ EasyMock.expectLastCall();
worker.startTask(taskId, connectorConfig, taskConfig(SourceSink.SOURCE), herder, TargetState.STARTED);
EasyMock.expectLastCall().andReturn(false);
@@ -391,7 +393,7 @@ public class StandaloneHerderTest {
expectConfigValidation(connConfig);
// Validate accessors with 1 connector
- listConnectorsCb.onCompletion(null, Collections.singleton(CONNECTOR_NAME));
+ listConnectorsCb.onCompletion(null, singleton(CONNECTOR_NAME));
EasyMock.expectLastCall();
ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
connectorInfoCb.onCompletion(null, connInfo);
@@ -477,7 +479,7 @@ public class StandaloneHerderTest {
PowerMock.replayAll();
herder.putTaskConfigs(CONNECTOR_NAME,
- Arrays.asList(Collections.singletonMap("config", "value")),
+ Arrays.asList(singletonMap("config", "value")),
cb);
PowerMock.verifyAll();
@@ -513,7 +515,7 @@ public class StandaloneHerderTest {
private void expectStop() {
ConnectorTaskId task = new ConnectorTaskId(CONNECTOR_NAME, 0);
worker.stopAndAwaitTasks(singletonList(task));
- EasyMock.expectLastCall().andReturn(Collections.singleton(task));
+ EasyMock.expectLastCall();
worker.stopConnector(CONNECTOR_NAME);
EasyMock.expectLastCall().andReturn(true);
}