You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/04/08 08:35:43 UTC
[kafka] branch trunk updated: KAFKA-6688. The Trogdor coordinator
should track task statuses (#4737)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 40183e3 KAFKA-6688. The Trogdor coordinator should track task statuses (#4737)
40183e3 is described below
commit 40183e31567795d4d0f2b836294bc5d5fac2a56b
Author: Colin Patrick McCabe <co...@cmccabe.xyz>
AuthorDate: Sun Apr 8 01:35:33 2018 -0700
KAFKA-6688. The Trogdor coordinator should track task statuses (#4737)
Reviewers: Anna Povzner <an...@confluent.io>, Rajini Sivaram <ra...@googlemail.com>
---
.../apache/kafka/trogdor/agent/WorkerManager.java | 10 +-
.../kafka/trogdor/coordinator/NodeManager.java | 47 +++----
.../kafka/trogdor/coordinator/TaskManager.java | 147 +++++++++++++--------
.../kafka/trogdor/fault/KiboshFaultWorker.java | 13 +-
.../trogdor/fault/NetworkPartitionFaultWorker.java | 12 +-
.../trogdor/fault/ProcessStopFaultWorker.java | 12 +-
.../org/apache/kafka/trogdor/rest/TaskDone.java | 7 +-
.../org/apache/kafka/trogdor/rest/TaskPending.java | 3 +-
.../org/apache/kafka/trogdor/rest/TaskRunning.java | 6 +-
.../org/apache/kafka/trogdor/rest/TaskState.java | 12 +-
.../apache/kafka/trogdor/rest/TaskStopping.java | 6 +-
.../org/apache/kafka/trogdor/rest/WorkerDone.java | 10 +-
.../apache/kafka/trogdor/rest/WorkerReceiving.java | 7 +
.../apache/kafka/trogdor/rest/WorkerRunning.java | 10 +-
.../apache/kafka/trogdor/rest/WorkerStarting.java | 7 +
.../org/apache/kafka/trogdor/rest/WorkerState.java | 5 +-
.../apache/kafka/trogdor/rest/WorkerStopping.java | 10 +-
.../AgentWorkerStatusTracker.java} | 29 ++--
.../apache/kafka/trogdor/task/NoOpTaskWorker.java | 10 +-
.../org/apache/kafka/trogdor/task/TaskWorker.java | 6 +-
.../WorkerStatusTracker.java} | 20 +--
.../kafka/trogdor/workload/ProduceBenchWorker.java | 52 +++++---
.../kafka/trogdor/workload/RoundTripWorker.java | 4 +-
.../org/apache/kafka/trogdor/agent/AgentTest.java | 57 ++++----
.../trogdor/common/JsonSerializationTest.java | 2 +-
.../kafka/trogdor/coordinator/CoordinatorTest.java | 118 +++++++++++++++--
.../apache/kafka/trogdor/task/SampleTaskSpec.java | 15 ++-
.../kafka/trogdor/task/SampleTaskWorker.java | 15 ++-
.../apache/kafka/trogdor/task/TaskSpecTest.java | 4 +-
29 files changed, 441 insertions(+), 215 deletions(-)
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
index cda7773..7c8de6d 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/WorkerManager.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.rest.WorkerDone;
@@ -29,6 +30,7 @@ import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.rest.WorkerStarting;
import org.apache.kafka.trogdor.rest.WorkerStopping;
import org.apache.kafka.trogdor.rest.WorkerState;
+import org.apache.kafka.trogdor.task.AgentWorkerStatusTracker;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.slf4j.Logger;
@@ -43,7 +45,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
public final class WorkerManager {
private static final Logger log = LoggerFactory.getLogger(WorkerManager.class);
@@ -190,7 +191,7 @@ public final class WorkerManager {
/**
* The worker status.
*/
- private final AtomicReference<String> status = new AtomicReference<>("");
+ private final AgentWorkerStatusTracker status = new AgentWorkerStatusTracker();
/**
* The time when this task was started.
@@ -293,6 +294,8 @@ public final class WorkerManager {
haltFuture.thenApply(new KafkaFuture.BaseFunction<String, Void>() {
@Override
public Void apply(String errorString) {
+ if (errorString == null)
+ errorString = "";
if (errorString.isEmpty()) {
log.info("{}: Worker {} is halting.", nodeName, id);
} else {
@@ -306,8 +309,9 @@ public final class WorkerManager {
try {
worker.taskWorker.start(platform, worker.status, haltFuture);
} catch (Exception e) {
+ log.info("{}: Worker {} start() exception", nodeName, id, e);
stateChangeExecutor.submit(new HandleWorkerHalting(worker,
- "worker.start() exception: " + e.getMessage(), true));
+ "worker.start() exception: " + Utils.stackTrace(e), true));
}
stateChangeExecutor.submit(new FinishCreatingWorker(worker));
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
index 0129007..91ef9c2 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
@@ -49,7 +49,6 @@ import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
import org.apache.kafka.trogdor.rest.StopWorkerRequest;
-import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerReceiving;
import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.rest.WorkerStarting;
@@ -192,6 +191,9 @@ public final class NodeManager {
// agents going down?
return;
}
+ if (log.isTraceEnabled()) {
+ log.trace("{}: got heartbeat status {}", node.name(), agentStatus);
+ }
// Identify workers which we think should be running, but which do not appear
// in the agent's response. We need to send startWorker requests for these.
for (Map.Entry<String, ManagedWorker> entry : workers.entrySet()) {
@@ -203,40 +205,31 @@ public final class NodeManager {
}
}
}
- // Identify tasks which are running, but which we don't know about.
- // Add these to the NodeManager as tasks that should not be running.
for (Map.Entry<String, WorkerState> entry : agentStatus.workers().entrySet()) {
String id = entry.getKey();
WorkerState state = entry.getValue();
- if (!workers.containsKey(id)) {
+ ManagedWorker worker = workers.get(id);
+ if (worker == null) {
+ // Identify tasks which are running, but which we don't know about.
+ // Add these to the NodeManager as tasks that should not be running.
log.warn("{}: scheduling unknown worker {} for stopping.", node.name(), id);
workers.put(id, new ManagedWorker(id, state.spec(), false, state));
- }
- }
- // Handle workers which need to be stopped. Handle workers which have newly completed.
- for (Map.Entry<String, WorkerState> entry : agentStatus.workers().entrySet()) {
- String id = entry.getKey();
- WorkerState state = entry.getValue();
- ManagedWorker worker = workers.get(id);
- if (state instanceof WorkerStarting || state instanceof WorkerRunning) {
- if (!worker.shouldRun) {
- worker.tryStop();
- }
- } else if (state instanceof WorkerDone) {
- if (!(worker.state instanceof WorkerDone)) {
- WorkerDone workerDoneState = (WorkerDone) state;
- String error = workerDoneState.error();
- if (error.isEmpty()) {
- log.info("{}: Worker {} finished with status '{}'",
- node.name(), id, workerDoneState.status());
- } else {
- log.warn("{}: Worker {} finished with error '{}' and status '{}'",
- node.name(), id, error, workerDoneState.status());
+ } else {
+ // Handle workers which need to be stopped.
+ if (state instanceof WorkerStarting || state instanceof WorkerRunning) {
+ if (!worker.shouldRun) {
+ worker.tryStop();
}
- taskManager.handleWorkerCompletion(node.name(), worker.id, error);
+ }
+ // Notify the TaskManager if the worker state has changed.
+ if (worker.state.equals(state)) {
+ log.debug("{}: worker state is still {}", node.name(), worker.state);
+ } else {
+ log.info("{}: worker state changed from {} to {}", node.name(), worker.state, state);
+ worker.state = state;
+ taskManager.updateWorkerState(node.name(), worker.id, state);
}
}
- worker.state = state;
}
} catch (Throwable e) {
log.error("{}: Unhandled exception in NodeHeartbeatRunnable", node.name(), e);
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
index d88e1d5..7e19c8b 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
@@ -17,10 +17,14 @@
package org.apache.kafka.trogdor.coordinator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
@@ -31,13 +35,15 @@ import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TaskStopping;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TasksResponse;
+import org.apache.kafka.trogdor.rest.WorkerDone;
+import org.apache.kafka.trogdor.rest.WorkerReceiving;
+import org.apache.kafka.trogdor.rest.WorkerState;
import org.apache.kafka.trogdor.task.TaskController;
import org.apache.kafka.trogdor.task.TaskSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
@@ -172,16 +178,9 @@ public final class TaskManager {
private Future<?> startFuture = null;
/**
- * The name of the worker nodes involved with this task.
- * Null if the task is not running.
+ * The states of the workers involved with this task.
*/
- private Set<String> workers = null;
-
- /**
- * The names of the worker nodes which are still running this task.
- * Null if the task is not running.
- */
- private Set<String> activeWorkers = null;
+ public Map<String, WorkerState> workerStates = new TreeMap<>();
/**
* If this is non-empty, a message describing how this task failed.
@@ -241,14 +240,39 @@ public final class TaskManager {
case PENDING:
return new TaskPending(spec);
case RUNNING:
- return new TaskRunning(spec, startedMs);
+ return new TaskRunning(spec, startedMs, getCombinedStatus(workerStates));
case STOPPING:
- return new TaskStopping(spec, startedMs);
+ return new TaskStopping(spec, startedMs, getCombinedStatus(workerStates));
case DONE:
- return new TaskDone(spec, startedMs, doneMs, error, cancelled);
+ return new TaskDone(spec, startedMs, doneMs, error, cancelled, getCombinedStatus(workerStates));
}
throw new RuntimeException("unreachable");
}
+
+ TreeSet<String> activeWorkers() {
+ TreeSet<String> workerNames = new TreeSet<>();
+ for (Map.Entry<String, WorkerState> entry : workerStates.entrySet()) {
+ if (!entry.getValue().done()) {
+ workerNames.add(entry.getKey());
+ }
+ }
+ return workerNames;
+ }
+ }
+
+ private static final JsonNode getCombinedStatus(Map<String, WorkerState> states) {
+ if (states.size() == 1) {
+ return states.values().iterator().next().status();
+ } else {
+ ObjectNode objectNode = new ObjectNode(JsonNodeFactory.instance);
+ for (Map.Entry<String, WorkerState> entry : states.entrySet()) {
+ JsonNode node = entry.getValue().status();
+ if (node != null) {
+ objectNode.set(entry.getKey(), node);
+ }
+ }
+ return objectNode;
+ }
}
/**
@@ -349,10 +373,8 @@ public final class TaskManager {
log.info("Running task {} on node(s): {}", task.id, Utils.join(nodeNames, ", "));
task.state = ManagedTaskState.RUNNING;
task.startedMs = time.milliseconds();
- task.workers = nodeNames;
- task.activeWorkers = new HashSet<>();
- for (String workerName : task.workers) {
- task.activeWorkers.add(workerName);
+ for (String workerName : nodeNames) {
+ task.workerStates.put(workerName, new WorkerReceiving(task.spec));
nodeManagers.get(workerName).createWorker(task.id, task.spec);
}
return null;
@@ -398,15 +420,16 @@ public final class TaskManager {
break;
case RUNNING:
task.cancelled = true;
- if (task.activeWorkers.size() == 0) {
+ TreeSet<String> activeWorkers = task.activeWorkers();
+ if (activeWorkers.isEmpty()) {
log.info("Task {} is now complete with error: {}", id, task.error);
task.doneMs = time.milliseconds();
task.state = ManagedTaskState.DONE;
} else {
- for (String workerName : task.activeWorkers) {
+ for (String workerName : activeWorkers) {
nodeManagers.get(workerName).stopWorker(id);
}
- log.info("Cancelling task {} on worker(s): {}", id, Utils.join(task.activeWorkers, ", "));
+ log.info("Cancelling task {} on worker(s): {}", id, Utils.join(activeWorkers, ", "));
task.state = ManagedTaskState.STOPPING;
}
break;
@@ -422,66 +445,80 @@ public final class TaskManager {
}
/**
- * A callback NodeManager makes to indicate that a worker has completed.
- * The task will transition to DONE once all workers are done.
+ * Update the state of a particular agent's worker.
*
- * @param nodeName The node name.
+ * @param nodeName The node where the agent is running.
* @param id The worker name.
- * @param error An empty string if there is no error, or an error string.
+ * @param state The worker state.
*/
- public void handleWorkerCompletion(String nodeName, String id, String error) {
- executor.submit(new HandleWorkerCompletion(nodeName, id, error));
+ public void updateWorkerState(String nodeName, String id, WorkerState state) {
+ executor.submit(new UpdateWorkerState(nodeName, id, state));
}
- class HandleWorkerCompletion implements Callable<Void> {
+ class UpdateWorkerState implements Callable<Void> {
private final String nodeName;
private final String id;
- private final String error;
+ private final WorkerState state;
- HandleWorkerCompletion(String nodeName, String id, String error) {
+ UpdateWorkerState(String nodeName, String id, WorkerState state) {
this.nodeName = nodeName;
this.id = id;
- this.error = error;
+ this.state = state;
}
@Override
public Void call() throws Exception {
ManagedTask task = tasks.get(id);
if (task == null) {
- log.error("Can't handle completion of unknown worker {} on node {}",
+ log.error("Can't update worker state unknown worker {} on node {}",
id, nodeName);
return null;
}
- if ((task.state == ManagedTaskState.PENDING) || (task.state == ManagedTaskState.DONE)) {
- log.error("Task {} got unexpected worker completion from {} while " +
- "in {} state.", id, nodeName, task.state);
- return null;
- }
- boolean broadcastStop = false;
- if (task.state == ManagedTaskState.RUNNING) {
- task.state = ManagedTaskState.STOPPING;
- broadcastStop = true;
- }
- task.maybeSetError(error);
- task.activeWorkers.remove(nodeName);
- if (task.activeWorkers.size() == 0) {
- task.doneMs = time.milliseconds();
- task.state = ManagedTaskState.DONE;
- log.info("Task {} is now complete on {} with error: {}", id,
- Utils.join(task.workers, ", "),
- task.error.isEmpty() ? "(none)" : task.error);
- } else if (broadcastStop) {
- log.info("Node {} stopped. Stopping task {} on worker(s): {}",
- id, Utils.join(task.activeWorkers, ", "));
- for (String workerName : task.activeWorkers) {
- nodeManagers.get(workerName).stopWorker(id);
- }
+ WorkerState prevState = task.workerStates.get(nodeName);
+ log.debug("Task {}: Updating worker state for {} from {} to {}.",
+ id, nodeName, prevState, state);
+ task.workerStates.put(nodeName, state);
+ if (state.done() && (!prevState.done())) {
+ handleWorkerCompletion(task, nodeName, (WorkerDone) state);
}
return null;
}
}
/**
+ * Handle a worker being completed.
+ *
+ * @param task The task that owns the worker.
+ * @param nodeName The name of the node on which the worker is running.
+ * @param state The worker state.
+ */
+ private void handleWorkerCompletion(ManagedTask task, String nodeName, WorkerDone state) {
+ if (state.error().isEmpty()) {
+ log.info("{}: Worker {} finished with status '{}'",
+ nodeName, task.id, JsonUtil.toJsonString(state.status()));
+ } else {
+ log.warn("{}: Worker {} finished with error '{}' and status '{}'",
+ nodeName, task.id, state.error(), JsonUtil.toJsonString(state.status()));
+ task.maybeSetError(state.error());
+ }
+ if (task.activeWorkers().isEmpty()) {
+ task.doneMs = time.milliseconds();
+ task.state = ManagedTaskState.DONE;
+ log.info("{}: Task {} is now complete on {} with error: {}",
+ nodeName, task.id, Utils.join(task.workerStates.keySet(), ", "),
+ task.error.isEmpty() ? "(none)" : task.error);
+ } else if ((task.state == ManagedTaskState.RUNNING) && (!task.error.isEmpty())) {
+ TreeSet<String> activeWorkers = task.activeWorkers();
+ log.info("{}: task {} stopped with error {}. Stopping worker(s): {}",
+ nodeName, task.id, task.error, Utils.join(activeWorkers, ", "));
+ task.state = ManagedTaskState.STOPPING;
+ for (String workerName : activeWorkers) {
+ nodeManagers.get(workerName).stopWorker(task.id);
+ }
+ }
+ }
+
+ /**
* Get information about the tasks being managed.
*/
public TasksResponse tasks(TasksRequest request) throws ExecutionException, InterruptedException {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java
index 629d15e..97934a8 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/KiboshFaultWorker.java
@@ -17,15 +17,15 @@
package org.apache.kafka.trogdor.fault;
+import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.fault.Kibosh.KiboshFaultSpec;
import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.atomic.AtomicReference;
-
public class KiboshFaultWorker implements TaskWorker {
private static final Logger log = LoggerFactory.getLogger(KiboshFaultWorker.class);
@@ -35,6 +35,8 @@ public class KiboshFaultWorker implements TaskWorker {
private final String mountPath;
+ private WorkerStatusTracker status;
+
public KiboshFaultWorker(String id, KiboshFaultSpec spec, String mountPath) {
this.id = id;
this.spec = spec;
@@ -42,15 +44,20 @@ public class KiboshFaultWorker implements TaskWorker {
}
@Override
- public void start(Platform platform, AtomicReference<String> status,
+ public void start(Platform platform, WorkerStatusTracker status,
KafkaFutureImpl<String> errorFuture) throws Exception {
log.info("Activating {} {}: {}.", spec.getClass().getSimpleName(), id, spec);
+ this.status = status;
+ this.status.update(new TextNode("Adding fault " + id));
Kibosh.INSTANCE.addFault(mountPath, spec);
+ this.status.update(new TextNode("Added fault " + id));
}
@Override
public void stop(Platform platform) throws Exception {
log.info("Deactivating {} {}: {}.", spec.getClass().getSimpleName(), id, spec);
+ this.status.update(new TextNode("Removing fault " + id));
Kibosh.INSTANCE.removeFault(mountPath, spec);
+ this.status.update(new TextNode("Removed fault " + id));
}
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java
index 787c5e0..1b99a93 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java
@@ -17,11 +17,13 @@
package org.apache.kafka.trogdor.fault;
+import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.Topology;
import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,7 +31,6 @@ import java.net.InetAddress;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicReference;
public class NetworkPartitionFaultWorker implements TaskWorker {
private static final Logger log = LoggerFactory.getLogger(NetworkPartitionFaultWorker.class);
@@ -38,22 +39,29 @@ public class NetworkPartitionFaultWorker implements TaskWorker {
private final List<Set<String>> partitionSets;
+ private WorkerStatusTracker status;
+
public NetworkPartitionFaultWorker(String id, List<Set<String>> partitionSets) {
this.id = id;
this.partitionSets = partitionSets;
}
@Override
- public void start(Platform platform, AtomicReference<String> status,
+ public void start(Platform platform, WorkerStatusTracker status,
KafkaFutureImpl<String> errorFuture) throws Exception {
log.info("Activating NetworkPartitionFault {}.", id);
+ this.status = status;
+ this.status.update(new TextNode("creating network partition " + id));
runIptablesCommands(platform, "-A");
+ this.status.update(new TextNode("created network partition " + id));
}
@Override
public void stop(Platform platform) throws Exception {
log.info("Deactivating NetworkPartitionFault {}.", id);
+ this.status.update(new TextNode("removing network partition " + id));
runIptablesCommands(platform, "-D");
+ this.status.update(new TextNode("removed network partition " + id));
}
private void runIptablesCommands(Platform platform, String iptablesAction) throws Exception {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java
index 66a8c6e..d30eaf7 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/ProcessStopFaultWorker.java
@@ -17,16 +17,17 @@
package org.apache.kafka.trogdor.fault;
+import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
public class ProcessStopFaultWorker implements TaskWorker {
private static final Logger log = LoggerFactory.getLogger(ProcessStopFaultWorker.class);
@@ -35,22 +36,29 @@ public class ProcessStopFaultWorker implements TaskWorker {
private final String javaProcessName;
+ private WorkerStatusTracker status;
+
public ProcessStopFaultWorker(String id, String javaProcessName) {
this.id = id;
this.javaProcessName = javaProcessName;
}
@Override
- public void start(Platform platform, AtomicReference<String> status,
+ public void start(Platform platform, WorkerStatusTracker status,
KafkaFutureImpl<String> errorFuture) throws Exception {
+ this.status = status;
log.info("Activating ProcessStopFault {}.", id);
+ this.status.update(new TextNode("stopping " + javaProcessName));
sendSignals(platform, "SIGSTOP");
+ this.status.update(new TextNode("stopped " + javaProcessName));
}
@Override
public void stop(Platform platform) throws Exception {
log.info("Deactivating ProcessStopFault {}.", id);
+ this.status.update(new TextNode("resuming " + javaProcessName));
sendSignals(platform, "SIGCONT");
+ this.status.update(new TextNode("resumed " + javaProcessName));
}
private void sendSignals(Platform platform, String signalName) throws Exception {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java
index 536d3f2..e8d6003 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java
@@ -16,9 +16,9 @@
*/
package org.apache.kafka.trogdor.rest;
-
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.trogdor.task.TaskSpec;
/**
@@ -50,8 +50,9 @@ public class TaskDone extends TaskState {
@JsonProperty("startedMs") long startedMs,
@JsonProperty("doneMs") long doneMs,
@JsonProperty("error") String error,
- @JsonProperty("cancelled") boolean cancelled) {
- super(spec);
+ @JsonProperty("cancelled") boolean cancelled,
+ @JsonProperty("status") JsonNode status) {
+ super(spec, status);
this.startedMs = startedMs;
this.doneMs = doneMs;
this.error = error;
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
index b0162d3..7831301 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
@@ -19,6 +19,7 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.kafka.trogdor.task.TaskSpec;
/**
@@ -27,6 +28,6 @@ import org.apache.kafka.trogdor.task.TaskSpec;
public class TaskPending extends TaskState {
@JsonCreator
public TaskPending(@JsonProperty("spec") TaskSpec spec) {
- super(spec);
+ super(spec, NullNode.instance);
}
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java
index bff3676..7a81bdf 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java
@@ -19,6 +19,7 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.trogdor.task.TaskSpec;
/**
@@ -32,8 +33,9 @@ public class TaskRunning extends TaskState {
@JsonCreator
public TaskRunning(@JsonProperty("spec") TaskSpec spec,
- @JsonProperty("startedMs") long startedMs) {
- super(spec);
+ @JsonProperty("startedMs") long startedMs,
+ @JsonProperty("status") JsonNode status) {
+ super(spec, status);
this.startedMs = startedMs;
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
index 28b6108..0764e14 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
@@ -20,6 +20,8 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.kafka.trogdor.task.TaskSpec;
/**
@@ -37,12 +39,20 @@ import org.apache.kafka.trogdor.task.TaskSpec;
public abstract class TaskState extends Message {
private final TaskSpec spec;
- public TaskState(TaskSpec spec) {
+ private final JsonNode status;
+
+ public TaskState(TaskSpec spec, JsonNode status) {
this.spec = spec;
+ this.status = status == null ? NullNode.instance : status;
}
@JsonProperty
public TaskSpec spec() {
return spec;
}
+
+ @JsonProperty
+ public JsonNode status() {
+ return status;
+ }
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java
index 4446b75..d40b43c 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java
@@ -19,6 +19,7 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.trogdor.task.TaskSpec;
/**
@@ -32,8 +33,9 @@ public class TaskStopping extends TaskState {
@JsonCreator
public TaskStopping(@JsonProperty("spec") TaskSpec spec,
- @JsonProperty("startedMs") long startedMs) {
- super(spec);
+ @JsonProperty("startedMs") long startedMs,
+ @JsonProperty("status") JsonNode status) {
+ super(spec, status);
this.startedMs = startedMs;
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
index e463ffc..500d3c6 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
@@ -19,6 +19,8 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.kafka.trogdor.task.TaskSpec;
/**
@@ -39,7 +41,7 @@ public class WorkerDone extends WorkerState {
* The task status. The format will depend on the type of task that is
* being run.
*/
- private final String status;
+ private final JsonNode status;
/**
* Empty if the task completed without error; the error message otherwise.
@@ -50,12 +52,12 @@ public class WorkerDone extends WorkerState {
public WorkerDone(@JsonProperty("spec") TaskSpec spec,
@JsonProperty("startedMs") long startedMs,
@JsonProperty("doneMs") long doneMs,
- @JsonProperty("status") String status,
+ @JsonProperty("status") JsonNode status,
@JsonProperty("error") String error) {
super(spec);
this.startedMs = startedMs;
this.doneMs = doneMs;
- this.status = status == null ? "" : status;
+ this.status = status == null ? NullNode.instance : status;
this.error = error == null ? "" : error;
}
@@ -72,7 +74,7 @@ public class WorkerDone extends WorkerState {
@JsonProperty
@Override
- public String status() {
+ public JsonNode status() {
return status;
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
index d3e3565..7068774 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
@@ -19,6 +19,8 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.trogdor.task.TaskSpec;
/**
@@ -30,4 +32,9 @@ public final class WorkerReceiving extends WorkerState {
public WorkerReceiving(@JsonProperty("spec") TaskSpec spec) {
super(spec);
}
+
+ @Override
+ public JsonNode status() {
+ return new TextNode("receiving");
+ }
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
index e3b8d19..af8ee88 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerRunning.java
@@ -19,6 +19,8 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.kafka.trogdor.task.TaskSpec;
/**
@@ -34,15 +36,15 @@ public class WorkerRunning extends WorkerState {
* The task status. The format will depend on the type of task that is
* being run.
*/
- private final String status;
+ private final JsonNode status;
@JsonCreator
public WorkerRunning(@JsonProperty("spec") TaskSpec spec,
@JsonProperty("startedMs") long startedMs,
- @JsonProperty("status") String status) {
+ @JsonProperty("status") JsonNode status) {
super(spec);
this.startedMs = startedMs;
- this.status = status == null ? "" : status;
+ this.status = status == null ? NullNode.instance : status;
}
@JsonProperty
@@ -53,7 +55,7 @@ public class WorkerRunning extends WorkerState {
@JsonProperty
@Override
- public String status() {
+ public JsonNode status() {
return status;
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
index 3a766ea..b568ec1 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
@@ -19,6 +19,8 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.trogdor.task.TaskSpec;
/**
@@ -29,4 +31,9 @@ public final class WorkerStarting extends WorkerState {
public WorkerStarting(@JsonProperty("spec") TaskSpec spec) {
super(spec);
}
+
+ @Override
+ public JsonNode status() {
+ return new TextNode("starting");
+ }
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java
index 6d7c687..044d719 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerState.java
@@ -20,6 +20,7 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.trogdor.task.TaskSpec;
@@ -60,9 +61,7 @@ public abstract class WorkerState extends Message {
throw new KafkaException("invalid state");
}
- public String status() {
- throw new KafkaException("invalid state");
- }
+ public abstract JsonNode status();
public boolean running() {
return false;
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
index 777e511..9fbb3ff 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStopping.java
@@ -19,6 +19,8 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
import org.apache.kafka.trogdor.task.TaskSpec;
/**
@@ -34,15 +36,15 @@ public class WorkerStopping extends WorkerState {
* The task status. The format will depend on the type of task that is
* being run.
*/
- private final String status;
+ private final JsonNode status;
@JsonCreator
public WorkerStopping(@JsonProperty("spec") TaskSpec spec,
@JsonProperty("startedMs") long startedMs,
- @JsonProperty("status") String status) {
+ @JsonProperty("status") JsonNode status) {
super(spec);
this.startedMs = startedMs;
- this.status = status == null ? "" : status;
+ this.status = status == null ? NullNode.instance : status;
}
@JsonProperty
@@ -53,7 +55,7 @@ public class WorkerStopping extends WorkerState {
@JsonProperty
@Override
- public String status() {
+ public JsonNode status() {
return status;
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java b/tools/src/main/java/org/apache/kafka/trogdor/task/AgentWorkerStatusTracker.java
similarity index 57%
copy from tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
copy to tools/src/main/java/org/apache/kafka/trogdor/task/AgentWorkerStatusTracker.java
index 3a766ea..2ad8e4e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerStarting.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/task/AgentWorkerStatusTracker.java
@@ -15,18 +15,29 @@
* limitations under the License.
*/
-package org.apache.kafka.trogdor.rest;
+package org.apache.kafka.trogdor.task;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.task.TaskSpec;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.NullNode;
/**
- * When we have just started a worker.
+ * Tracks the status of a Trogdor worker.
*/
-public final class WorkerStarting extends WorkerState {
- @JsonCreator
- public WorkerStarting(@JsonProperty("spec") TaskSpec spec) {
- super(spec);
+public class AgentWorkerStatusTracker implements WorkerStatusTracker {
+ private JsonNode status = NullNode.instance;
+
+ @Override
+ public void update(JsonNode newStatus) {
+ JsonNode status = newStatus.deepCopy();
+ synchronized (this) {
+ this.status = status;
+ }
+ }
+
+ /**
+ * Retrieves the status.
+ */
+ public synchronized JsonNode get() {
+ return status;
}
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskWorker.java
index dfa8084..77336d8 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/task/NoOpTaskWorker.java
@@ -17,30 +17,34 @@
package org.apache.kafka.trogdor.task;
+import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.trogdor.common.Platform;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.atomic.AtomicReference;
-
public class NoOpTaskWorker implements TaskWorker {
private static final Logger log = LoggerFactory.getLogger(NoOpTaskWorker.class);
private final String id;
+ private WorkerStatusTracker status;
+
public NoOpTaskWorker(String id) {
this.id = id;
}
@Override
- public void start(Platform platform, AtomicReference<String> status,
+ public void start(Platform platform, WorkerStatusTracker status,
KafkaFutureImpl<String> errorFuture) throws Exception {
log.info("{}: Activating NoOpTask.", id);
+ this.status = status;
+ this.status.update(new TextNode("active"));
}
@Override
public void stop(Platform platform) throws Exception {
log.info("{}: Deactivating NoOpTask.", id);
+ this.status.update(new TextNode("done"));
}
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/task/TaskWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskWorker.java
index 288eb9c..042568f 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/task/TaskWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/task/TaskWorker.java
@@ -20,8 +20,6 @@ package org.apache.kafka.trogdor.task;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.trogdor.common.Platform;
-import java.util.concurrent.atomic.AtomicReference;
-
/**
* The agent-side interface for implementing tasks.
*/
@@ -42,7 +40,7 @@ public interface TaskWorker {
*
*
* @param platform The platform to use.
- * @param status The current status string. The TaskWorker can update
+ * @param status The current status. The TaskWorker can update
* this at any time to provide an updated status.
* @param haltFuture A future which the worker should complete if it halts.
* If it is completed with an empty string, that means the task
@@ -53,7 +51,7 @@ public interface TaskWorker {
*
* @throws Exception If the TaskWorker failed to start. stop() will not be invoked.
*/
- void start(Platform platform, AtomicReference<String> status, KafkaFutureImpl<String> haltFuture)
+ void start(Platform platform, WorkerStatusTracker status, KafkaFutureImpl<String> haltFuture)
throws Exception;
/**
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java b/tools/src/main/java/org/apache/kafka/trogdor/task/WorkerStatusTracker.java
similarity index 67%
copy from tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
copy to tools/src/main/java/org/apache/kafka/trogdor/task/WorkerStatusTracker.java
index b0162d3..dfbc7ea 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/task/WorkerStatusTracker.java
@@ -15,18 +15,18 @@
* limitations under the License.
*/
-package org.apache.kafka.trogdor.rest;
+package org.apache.kafka.trogdor.task;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.task.TaskSpec;
+import com.fasterxml.jackson.databind.JsonNode;
/**
- * The state for a task which is still pending.
+ * Tracks the status of a Trogdor worker.
*/
-public class TaskPending extends TaskState {
- @JsonCreator
- public TaskPending(@JsonProperty("spec") TaskSpec spec) {
- super(spec);
- }
+public interface WorkerStatusTracker {
+ /**
+ * Updates the status.
+ *
+ * @param status The new status.
+ */
+ void update(JsonNode status);
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
index a891b83..4c3095f 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ProduceBenchWorker.java
@@ -19,6 +19,7 @@ package org.apache.kafka.trogdor.workload;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -34,6 +35,7 @@ import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +48,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
public class ProduceBenchWorker implements TaskWorker {
private static final Logger log = LoggerFactory.getLogger(ProduceBenchWorker.class);
@@ -61,7 +62,7 @@ public class ProduceBenchWorker implements TaskWorker {
private ScheduledExecutorService executor;
- private AtomicReference<String> status;
+ private WorkerStatusTracker status;
private KafkaFutureImpl<String> doneFuture;
@@ -81,7 +82,7 @@ public class ProduceBenchWorker implements TaskWorker {
}
@Override
- public void start(Platform platform, AtomicReference<String> status,
+ public void start(Platform platform, WorkerStatusTracker status,
KafkaFutureImpl<String> doneFuture) throws Exception {
if (!running.compareAndSet(false, true)) {
throw new IllegalStateException("ProducerBenchWorker is already running.");
@@ -112,9 +113,10 @@ public class ProduceBenchWorker implements TaskWorker {
newTopics.put(name, new NewTopic(name, spec.numPartitions(),
spec.replicationFactor()));
}
+ status.update(new TextNode("Creating " + spec.totalTopics() + " topic(s)"));
WorkerUtils.createTopics(log, spec.bootstrapServers(), spec.commonClientConf(),
spec.adminClientConf(), newTopics, false);
-
+ status.update(new TextNode("Created " + spec.totalTopics() + " topic(s)"));
executor.submit(new SendRecords());
} catch (Throwable e) {
WorkerUtils.abort(log, "Prepare", e, doneFuture);
@@ -181,7 +183,7 @@ public class ProduceBenchWorker implements TaskWorker {
this.histogram = new Histogram(5000);
int perPeriod = WorkerUtils.perSecToPerPeriod(spec.targetMessagesPerSec(), THROTTLE_PERIOD_MS);
this.statusUpdaterFuture = executor.scheduleWithFixedDelay(
- new StatusUpdater(histogram), 1, 1, TimeUnit.MINUTES);
+ new StatusUpdater(histogram), 30, 30, TimeUnit.SECONDS);
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers());
// add common client configs to producer properties, and then user-specified producer
@@ -218,10 +220,10 @@ public class ProduceBenchWorker implements TaskWorker {
WorkerUtils.abort(log, "SendRecords", e, doneFuture);
} finally {
statusUpdaterFuture.cancel(false);
- new StatusUpdater(histogram).run();
+ StatusData statusData = new StatusUpdater(histogram).update();
long curTimeMs = Time.SYSTEM.milliseconds();
log.info("Sent {} total record(s) in {} ms. status: {}",
- histogram.summarize().numSamples(), curTimeMs - startTimeMs, status.get());
+ histogram.summarize().numSamples(), curTimeMs - startTimeMs, statusData);
}
doneFuture.complete("");
return null;
@@ -234,46 +236,54 @@ public class ProduceBenchWorker implements TaskWorker {
public class StatusUpdater implements Runnable {
private final Histogram histogram;
- private final float[] percentiles;
StatusUpdater(Histogram histogram) {
this.histogram = histogram;
- this.percentiles = new float[] {0.50f, 0.95f, 0.99f};
}
@Override
public void run() {
try {
- Histogram.Summary summary = histogram.summarize(percentiles);
- StatusData statusData = new StatusData(summary.numSamples(), summary.average(),
- summary.percentiles().get(0).value(),
- summary.percentiles().get(1).value(),
- summary.percentiles().get(2).value());
- String statusDataString = JsonUtil.toJsonString(statusData);
- status.set(statusDataString);
+ update();
} catch (Exception e) {
WorkerUtils.abort(log, "StatusUpdater", e, doneFuture);
}
}
+
+ StatusData update() {
+ Histogram.Summary summary = histogram.summarize(StatusData.PERCENTILES);
+ StatusData statusData = new StatusData(summary.numSamples(), summary.average(),
+ summary.percentiles().get(0).value(),
+ summary.percentiles().get(1).value(),
+ summary.percentiles().get(2).value());
+ status.update(JsonUtil.JSON_SERDE.valueToTree(statusData));
+ return statusData;
+ }
}
public static class StatusData {
private final long totalSent;
private final float averageLatencyMs;
private final int p50LatencyMs;
- private final int p90LatencyMs;
+ private final int p95LatencyMs;
private final int p99LatencyMs;
+ /**
+ * The percentiles to use when calculating the histogram data.
+ * These should match up with the p50LatencyMs, p95LatencyMs, etc. fields.
+ */
+ final static float[] PERCENTILES = {0.5f, 0.95f, 0.99f};
+
@JsonCreator
StatusData(@JsonProperty("totalSent") long totalSent,
@JsonProperty("averageLatencyMs") float averageLatencyMs,
@JsonProperty("p50LatencyMs") int p50latencyMs,
- @JsonProperty("p90LatencyMs") int p90latencyMs,
+ @JsonProperty("p95LatencyMs") int p95latencyMs,
@JsonProperty("p99LatencyMs") int p99latencyMs) {
this.totalSent = totalSent;
this.averageLatencyMs = averageLatencyMs;
this.p50LatencyMs = p50latencyMs;
- this.p90LatencyMs = p90latencyMs;
+ this.p95LatencyMs = p95latencyMs;
this.p99LatencyMs = p99latencyMs;
}
@@ -293,8 +303,8 @@ public class ProduceBenchWorker implements TaskWorker {
}
@JsonProperty
- public int p90LatencyMs() {
- return p90LatencyMs;
+ public int p95LatencyMs() {
+ return p95LatencyMs;
}
@JsonProperty
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
index 08b11ac..12b0c08 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/RoundTripWorker.java
@@ -39,6 +39,7 @@ import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
+import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +56,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
public class RoundTripWorker implements TaskWorker {
private static final int THROTTLE_PERIOD_MS = 100;
@@ -98,7 +98,7 @@ public class RoundTripWorker implements TaskWorker {
}
@Override
- public void start(Platform platform, AtomicReference<String> status,
+ public void start(Platform platform, WorkerStatusTracker status,
KafkaFutureImpl<String> doneFuture) throws Exception {
if (!running.compareAndSet(false, true)) {
throw new IllegalStateException("RoundTripWorker is already running.");
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
index 30d13b5..61de5c9 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.trogdor.agent;
+import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.common.utils.MockScheduler;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Scheduler;
@@ -122,7 +123,7 @@ public class AgentTest {
CreateWorkerResponse response = client.createWorker(new CreateWorkerRequest("foo", fooSpec));
assertEquals(fooSpec.toString(), response.spec().toString());
new ExpectedTasks().addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerRunning(fooSpec, 0, "")).
+ workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
build()).
waitFor(client);
@@ -131,10 +132,10 @@ public class AgentTest {
client.createWorker(new CreateWorkerRequest("bar", barSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerRunning(fooSpec, 0, "")).
+ workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
build()).
addTask(new ExpectedTaskBuilder("bar").
- workerState(new WorkerRunning(barSpec, 0, "")).
+ workerState(new WorkerRunning(barSpec, 0, new TextNode("active"))).
build()).
waitFor(client);
@@ -142,13 +143,13 @@ public class AgentTest {
client.createWorker(new CreateWorkerRequest("baz", bazSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerRunning(fooSpec, 0, "")).
+ workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
build()).
addTask(new ExpectedTaskBuilder("bar").
- workerState(new WorkerRunning(barSpec, 0, "")).
+ workerState(new WorkerRunning(barSpec, 0, new TextNode("active"))).
build()).
addTask(new ExpectedTaskBuilder("baz").
- workerState(new WorkerRunning(bazSpec, 0, "")).
+ workerState(new WorkerRunning(bazSpec, 0, new TextNode("active"))).
build()).
waitFor(client);
@@ -169,7 +170,7 @@ public class AgentTest {
client.createWorker(new CreateWorkerRequest("foo", fooSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerRunning(fooSpec, 0, "")).
+ workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
build()).
waitFor(client);
@@ -179,10 +180,10 @@ public class AgentTest {
client.createWorker(new CreateWorkerRequest("bar", barSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerRunning(fooSpec, 0, "")).
+ workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
build()).
addTask(new ExpectedTaskBuilder("bar").
- workerState(new WorkerRunning(barSpec, 1, "")).
+ workerState(new WorkerRunning(barSpec, 1, new TextNode("active"))).
build()).
waitFor(client);
@@ -190,10 +191,10 @@ public class AgentTest {
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerDone(fooSpec, 0, 2, "", "")).
+ workerState(new WorkerDone(fooSpec, 0, 2, new TextNode("done"), "")).
build()).
addTask(new ExpectedTaskBuilder("bar").
- workerState(new WorkerRunning(barSpec, 1, "")).
+ workerState(new WorkerRunning(barSpec, 1, new TextNode("active"))).
build()).
waitFor(client);
@@ -201,10 +202,10 @@ public class AgentTest {
client.stopWorker(new StopWorkerRequest("bar"));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerDone(fooSpec, 0, 2, "", "")).
+ workerState(new WorkerDone(fooSpec, 0, 2, new TextNode("done"), "")).
build()).
addTask(new ExpectedTaskBuilder("bar").
- workerState(new WorkerDone(barSpec, 1, 7, "", "")).
+ workerState(new WorkerDone(barSpec, 1, 7, new TextNode("done"), "")).
build()).
waitFor(client);
@@ -221,34 +222,40 @@ public class AgentTest {
maxTries(10).target("localhost", agent.port()).build();
new ExpectedTasks().waitFor(client);
- SampleTaskSpec fooSpec = new SampleTaskSpec(0, 900000, 1, "");
+ SampleTaskSpec fooSpec = new SampleTaskSpec(0, 900000,
+ Collections.singletonMap("node01", 1L), "");
client.createWorker(new CreateWorkerRequest("foo", fooSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerRunning(fooSpec, 0, "")).
+ workerState(new WorkerRunning(fooSpec, 0, new TextNode("active"))).
build()).
waitFor(client);
- SampleTaskSpec barSpec = new SampleTaskSpec(0, 900000, 2, "baz");
+ SampleTaskSpec barSpec = new SampleTaskSpec(0, 900000,
+ Collections.singletonMap("node01", 2L), "baz");
client.createWorker(new CreateWorkerRequest("bar", barSpec));
time.sleep(1);
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerDone(fooSpec, 0, 1, "", "")).
+ workerState(new WorkerDone(fooSpec, 0, 1,
+ new TextNode("halted"), "")).
build()).
addTask(new ExpectedTaskBuilder("bar").
- workerState(new WorkerRunning(barSpec, 0, "")).
+ workerState(new WorkerRunning(barSpec, 0,
+ new TextNode("active"))).
build()).
waitFor(client);
time.sleep(1);
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerDone(fooSpec, 0, 1, "", "")).
+ workerState(new WorkerDone(fooSpec, 0, 1,
+ new TextNode("halted"), "")).
build()).
addTask(new ExpectedTaskBuilder("bar").
- workerState(new WorkerDone(barSpec, 0, 2, "", "baz")).
+ workerState(new WorkerDone(barSpec, 0, 2,
+ new TextNode("halted"), "baz")).
build()).
waitFor(client);
}
@@ -289,7 +296,7 @@ public class AgentTest {
client.createWorker(new CreateWorkerRequest("foo", fooSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerRunning(fooSpec, 0, "")).
+ workerState(new WorkerRunning(fooSpec, 0, new TextNode("Added fault foo"))).
build()).
waitFor(client);
Assert.assertEquals(new KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList(
@@ -299,9 +306,9 @@ public class AgentTest {
client.createWorker(new CreateWorkerRequest("bar", barSpec));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerRunning(fooSpec, 0, "")).build()).
+ workerState(new WorkerRunning(fooSpec, 0, new TextNode("Added fault foo"))).build()).
addTask(new ExpectedTaskBuilder("bar").
- workerState(new WorkerRunning(barSpec, 0, "")).build()).
+ workerState(new WorkerRunning(barSpec, 0, new TextNode("Added fault bar"))).build()).
waitFor(client);
Assert.assertEquals(new KiboshControlFile(new ArrayList<Kibosh.KiboshFaultSpec>() {{
add(new KiboshFilesUnreadableFaultSpec("/foo", 123));
@@ -311,9 +318,9 @@ public class AgentTest {
client.stopWorker(new StopWorkerRequest("foo"));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- workerState(new WorkerDone(fooSpec, 0, 1, "", "")).build()).
+ workerState(new WorkerDone(fooSpec, 0, 1, new TextNode("Removed fault foo"), "")).build()).
addTask(new ExpectedTaskBuilder("bar").
- workerState(new WorkerRunning(barSpec, 0, "")).build()).
+ workerState(new WorkerRunning(barSpec, 0, new TextNode("Added fault bar"))).build()).
waitFor(client);
Assert.assertEquals(new KiboshControlFile(Collections.<Kibosh.KiboshFaultSpec>singletonList(
new KiboshFilesUnreadableFaultSpec("/bar", 456))), mockKibosh.read());
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
index 76b206b..8101d9c 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/JsonSerializationTest.java
@@ -52,7 +52,7 @@ public class JsonSerializationTest {
0, 0, null, null, null, null, null, 0, 0, "test-topic", 1, (short) 3));
verify(new RoundTripWorkloadSpec(0, 0, null, null, null, null, null, null,
0, null, null, 0));
- verify(new SampleTaskSpec(0, 0, 0, null));
+ verify(new SampleTaskSpec(0, 0, null, null));
}
private <T> void verify(T val1) throws Exception {
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
index 004702f..34d7ffe 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
@@ -17,6 +17,9 @@
package org.apache.kafka.trogdor.coordinator;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.common.utils.MockScheduler;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Scheduler;
@@ -41,6 +44,7 @@ import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.task.NoOpTaskSpec;
+import org.apache.kafka.trogdor.task.SampleTaskSpec;
import org.junit.Rule;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
@@ -49,6 +53,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
import static org.junit.Assert.assertEquals;
@@ -94,8 +99,8 @@ public class CoordinatorTest {
time.sleep(2);
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- taskState(new TaskRunning(fooSpec, 2)).
- workerState(new WorkerRunning(fooSpec, 2, "")).
+ taskState(new TaskRunning(fooSpec, 2, new TextNode("active"))).
+ workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))).
build()).
waitFor(cluster.coordinatorClient()).
waitFor(cluster.agentClient("node02"));
@@ -103,7 +108,7 @@ public class CoordinatorTest {
time.sleep(3);
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- taskState(new TaskDone(fooSpec, 2, 5, "", false)).
+ taskState(new TaskDone(fooSpec, 2, 5, "", false, new TextNode("done"))).
build()).
waitFor(cluster.coordinatorClient());
}
@@ -131,26 +136,34 @@ public class CoordinatorTest {
NoOpTaskSpec fooSpec = new NoOpTaskSpec(5, 2);
coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec));
new ExpectedTasks().
- addTask(new ExpectedTaskBuilder("foo").taskState(new TaskPending(fooSpec)).build()).
+ addTask(new ExpectedTaskBuilder("foo").taskState(
+ new TaskPending(fooSpec)).build()).
waitFor(coordinatorClient).
waitFor(agentClient1).
waitFor(agentClient2);
time.sleep(11);
+ ObjectNode status1 = new ObjectNode(JsonNodeFactory.instance);
+ status1.set("node01", new TextNode("active"));
+ status1.set("node02", new TextNode("active"));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- taskState(new TaskRunning(fooSpec, 11)).
- workerState(new WorkerRunning(fooSpec, 11, "")).
+ taskState(new TaskRunning(fooSpec, 11, status1)).
+ workerState(new WorkerRunning(fooSpec, 11, new TextNode("active"))).
build()).
waitFor(coordinatorClient).
waitFor(agentClient1).
waitFor(agentClient2);
time.sleep(2);
+ ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance);
+ status2.set("node01", new TextNode("done"));
+ status2.set("node02", new TextNode("done"));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- taskState(new TaskDone(fooSpec, 11, 13, "", false)).
- workerState(new WorkerDone(fooSpec, 11, 13, "", "")).
+ taskState(new TaskDone(fooSpec, 11, 13,
+ "", false, status2)).
+ workerState(new WorkerDone(fooSpec, 11, 13, new TextNode("done"), "")).
build()).
waitFor(coordinatorClient).
waitFor(agentClient1).
@@ -186,21 +199,29 @@ public class CoordinatorTest {
waitFor(agentClient2);
time.sleep(11);
+
+ ObjectNode status1 = new ObjectNode(JsonNodeFactory.instance);
+ status1.set("node01", new TextNode("active"));
+ status1.set("node02", new TextNode("active"));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- taskState(new TaskRunning(fooSpec, 11)).
- workerState(new WorkerRunning(fooSpec, 11, "")).
+ taskState(new TaskRunning(fooSpec, 11, status1)).
+ workerState(new WorkerRunning(fooSpec, 11, new TextNode("active"))).
build()).
waitFor(coordinatorClient).
waitFor(agentClient1).
waitFor(agentClient2);
+ ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance);
+ status2.set("node01", new TextNode("done"));
+ status2.set("node02", new TextNode("done"));
time.sleep(1);
coordinatorClient.stopTask(new StopTaskRequest("foo"));
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- taskState(new TaskDone(fooSpec, 11, 12, "", true)).
- workerState(new WorkerDone(fooSpec, 11, 12, "", "")).
+ taskState(new TaskDone(fooSpec, 11, 12, "",
+ true, status2)).
+ workerState(new WorkerDone(fooSpec, 11, 12, new TextNode("done"), "")).
build()).
waitFor(coordinatorClient).
waitFor(agentClient1).
@@ -375,8 +396,8 @@ public class CoordinatorTest {
time.sleep(2);
new ExpectedTasks().
addTask(new ExpectedTaskBuilder("foo").
- taskState(new TaskRunning(fooSpec, 2)).
- workerState(new WorkerRunning(fooSpec, 2, "")).
+ taskState(new TaskRunning(fooSpec, 2, new TextNode("active"))).
+ workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))).
build()).
addTask(new ExpectedTaskBuilder("bar").
taskState(new TaskPending(barSpec)).
@@ -394,4 +415,73 @@ public class CoordinatorTest {
new TasksRequest(null, 3, 0, 0, 0)).tasks().size());
}
}
+
+ @Test
+ public void testWorkersExitingAtDifferentTimes() throws Exception {
+ MockTime time = new MockTime(0, 0, 0);
+ Scheduler scheduler = new MockScheduler(time);
+ try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
+ addCoordinator("node01").
+ addAgent("node02").
+ addAgent("node03").
+ scheduler(scheduler).
+ build()) {
+ CoordinatorClient coordinatorClient = cluster.coordinatorClient();
+ new ExpectedTasks().waitFor(coordinatorClient);
+
+ HashMap<String, Long> nodeToExitMs = new HashMap<>();
+ nodeToExitMs.put("node02", 10L);
+ nodeToExitMs.put("node03", 20L);
+ SampleTaskSpec fooSpec =
+ new SampleTaskSpec(2, 100, nodeToExitMs, "");
+ coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec));
+ new ExpectedTasks().
+ addTask(new ExpectedTaskBuilder("foo").
+ taskState(new TaskPending(fooSpec)).
+ build()).
+ waitFor(coordinatorClient);
+
+ time.sleep(2);
+ ObjectNode status1 = new ObjectNode(JsonNodeFactory.instance);
+ status1.set("node02", new TextNode("active"));
+ status1.set("node03", new TextNode("active"));
+ new ExpectedTasks().
+ addTask(new ExpectedTaskBuilder("foo").
+ taskState(new TaskRunning(fooSpec, 2, status1)).
+ workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))).
+ build()).
+ waitFor(coordinatorClient).
+ waitFor(cluster.agentClient("node02")).
+ waitFor(cluster.agentClient("node03"));
+
+ time.sleep(10);
+ ObjectNode status2 = new ObjectNode(JsonNodeFactory.instance);
+ status2.set("node02", new TextNode("halted"));
+ status2.set("node03", new TextNode("active"));
+ new ExpectedTasks().
+ addTask(new ExpectedTaskBuilder("foo").
+ taskState(new TaskRunning(fooSpec, 2, status2)).
+ workerState(new WorkerRunning(fooSpec, 2, new TextNode("active"))).
+ build()).
+ waitFor(coordinatorClient).
+ waitFor(cluster.agentClient("node03"));
+ new ExpectedTasks().
+ addTask(new ExpectedTaskBuilder("foo").
+ taskState(new TaskRunning(fooSpec, 2, status2)).
+ workerState(new WorkerDone(fooSpec, 2, 12, new TextNode("halted"), "")).
+ build()).
+ waitFor(cluster.agentClient("node02"));
+
+ time.sleep(10);
+ ObjectNode status3 = new ObjectNode(JsonNodeFactory.instance);
+ status3.set("node02", new TextNode("halted"));
+ status3.set("node03", new TextNode("halted"));
+ new ExpectedTasks().
+ addTask(new ExpectedTaskBuilder("foo").
+ taskState(new TaskDone(fooSpec, 2, 22, "",
+ false, status3)).
+ build()).
+ waitFor(coordinatorClient);
+ }
+ }
};
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java
index 26fdfb2..38a160f 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskSpec.java
@@ -20,23 +20,28 @@ package org.apache.kafka.trogdor.task;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
public class SampleTaskSpec extends TaskSpec {
- private final long exitMs;
+ private final Map<String, Long> nodeToExitMs;
private final String error;
@JsonCreator
public SampleTaskSpec(@JsonProperty("startMs") long startMs,
@JsonProperty("durationMs") long durationMs,
- @JsonProperty("exitMs") long exitMs,
+ @JsonProperty("nodeToExitMs") Map<String, Long> nodeToExitMs,
@JsonProperty("error") String error) {
super(startMs, durationMs);
- this.exitMs = exitMs;
+ this.nodeToExitMs = nodeToExitMs == null ? new HashMap<String, Long>() :
+ Collections.unmodifiableMap(nodeToExitMs);
this.error = error == null ? "" : error;
}
@JsonProperty
- public long exitMs() {
- return exitMs;
+ public Map<String, Long> nodeToExitMs() {
+ return nodeToExitMs;
}
@JsonProperty
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java
index ebac27e..ade055d 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/task/SampleTaskWorker.java
@@ -17,6 +17,7 @@
package org.apache.kafka.trogdor.task;
+import com.fasterxml.jackson.databind.node.TextNode;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.ThreadUtils;
@@ -26,12 +27,12 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
public class SampleTaskWorker implements TaskWorker {
private final SampleTaskSpec spec;
private final ScheduledExecutorService executor;
private Future<Void> future;
+ private WorkerStatusTracker status;
SampleTaskWorker(SampleTaskSpec spec) {
this.spec = spec;
@@ -41,17 +42,24 @@ public class SampleTaskWorker implements TaskWorker {
}
@Override
- public synchronized void start(Platform platform, AtomicReference<String> status,
+ public synchronized void start(Platform platform, WorkerStatusTracker status,
final KafkaFutureImpl<String> haltFuture) throws Exception {
if (this.future != null)
return;
+ this.status = status;
+ this.status.update(new TextNode("active"));
+
+ Long exitMs = spec.nodeToExitMs().get(platform.curNode().name());
+ if (exitMs == null) {
+ exitMs = Long.MAX_VALUE;
+ }
this.future = platform.scheduler().schedule(executor, new Callable<Void>() {
@Override
public Void call() throws Exception {
haltFuture.complete(spec.error());
return null;
}
- }, spec.exitMs());
+ }, exitMs);
}
@Override
@@ -59,5 +67,6 @@ public class SampleTaskWorker implements TaskWorker {
this.future.cancel(false);
this.executor.shutdown();
this.executor.awaitTermination(1, TimeUnit.DAYS);
+ this.status.update(new TextNode("halted"));
}
};
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/task/TaskSpecTest.java b/tools/src/test/java/org/apache/kafka/trogdor/task/TaskSpecTest.java
index abd7e62..d8d4ca9 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/task/TaskSpecTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/task/TaskSpecTest.java
@@ -40,11 +40,11 @@ public class TaskSpecTest {
} catch (InvalidTypeIdException e) {
}
String inputJson = "{\"class\":\"org.apache.kafka.trogdor.task.SampleTaskSpec\"," +
- "\"startMs\":123,\"durationMs\":456,\"exitMs\":1000,\"error\":\"foo\"}";
+ "\"startMs\":123,\"durationMs\":456,\"nodeToExitMs\":{\"node01\":1000},\"error\":\"foo\"}";
SampleTaskSpec spec = JsonUtil.JSON_SERDE.readValue(inputJson, SampleTaskSpec.class);
assertEquals(123, spec.startMs());
assertEquals(456, spec.durationMs());
- assertEquals(1000, spec.exitMs());
+ assertEquals(Long.valueOf(1000), spec.nodeToExitMs().get("node01"));
assertEquals("foo", spec.error());
String outputJson = JsonUtil.toJsonString(spec);
assertEquals(inputJson, outputJson);
--
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.