You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/11/03 09:38:06 UTC
[3/4] kafka git commit: KAFKA-6060;
Add workload generation capabilities to Trogdor
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java b/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
index 85270cd..6922c2e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/basic/BasicPlatform.java
@@ -18,6 +18,7 @@
package org.apache.kafka.trogdor.basic;
import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.Shell;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.Node;
@@ -36,6 +37,7 @@ public class BasicPlatform implements Platform {
private final Node curNode;
private final BasicTopology topology;
+ private final Scheduler scheduler;
private final CommandRunner commandRunner;
public interface CommandRunner {
@@ -57,7 +59,7 @@ public class BasicPlatform implements Platform {
}
public BasicPlatform(String curNodeName, BasicTopology topology,
- CommandRunner commandRunner) {
+ Scheduler scheduler, CommandRunner commandRunner) {
this.curNode = topology.node(curNodeName);
if (this.curNode == null) {
throw new RuntimeException(String.format("No node named %s found " +
@@ -65,16 +67,18 @@ public class BasicPlatform implements Platform {
Utils.join(topology.nodes().keySet(), ",")));
}
this.topology = topology;
+ this.scheduler = scheduler;
this.commandRunner = commandRunner;
}
- public BasicPlatform(String curNodeName, JsonNode configRoot) {
+ public BasicPlatform(String curNodeName, Scheduler scheduler, JsonNode configRoot) {
JsonNode nodes = configRoot.get("nodes");
if (nodes == null) {
throw new RuntimeException("Expected to find a 'nodes' field " +
"in the root JSON configuration object");
}
this.topology = new BasicTopology(nodes);
+ this.scheduler = scheduler;
this.curNode = topology.node(curNodeName);
if (this.curNode == null) {
throw new RuntimeException(String.format("No node named %s found " +
@@ -100,6 +104,11 @@ public class BasicPlatform implements Platform {
}
@Override
+ public Scheduler scheduler() {
+ return scheduler;
+ }
+
+ @Override
public String runCommand(String[] command) throws IOException {
return commandRunner.run(curNode, command);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/common/Platform.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/Platform.java b/tools/src/main/java/org/apache/kafka/trogdor/common/Platform.java
index 1177ace..cb20620 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/common/Platform.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/Platform.java
@@ -21,6 +21,8 @@ import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
import java.io.IOException;
+
+import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.common.utils.Utils;
/**
@@ -47,6 +49,7 @@ public interface Platform {
String platformName = platformNode.textValue();
return Utils.newParameterizedInstance(platformName,
String.class, curNodeName,
+ Scheduler.class, Scheduler.SYSTEM,
JsonNode.class, root);
}
}
@@ -67,6 +70,11 @@ public interface Platform {
Topology topology();
/**
+ * Get the scheduler to use.
+ */
+ Scheduler scheduler();
+
+ /**
* Run a command on this local node.
*
* Throws an exception if the command could not be run, or if the
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/common/ThreadUtils.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/ThreadUtils.java b/tools/src/main/java/org/apache/kafka/trogdor/common/ThreadUtils.java
new file mode 100644
index 0000000..7e02856
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/ThreadUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.common;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Utilities for working with threads.
+ */
+public class ThreadUtils {
+ /**
+ * Create a new ThreadFactory.
+ *
+ * @param pattern The pattern to use. If this contains %d, it will be
+ * replaced with a thread number. It should not contain more
+ * than one %d.
+ * @param daemon True if we want daemon threads.
+ * @return The new ThreadFactory.
+ */
+ public static ThreadFactory createThreadFactory(final String pattern,
+ final boolean daemon) {
+ return new ThreadFactory() {
+ private final AtomicLong threadEpoch = new AtomicLong(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ String threadName;
+ if (pattern.contains("%d")) {
+ threadName = String.format(pattern, threadEpoch.addAndGet(1));
+ } else {
+ threadName = pattern;
+ }
+ Thread thread = new Thread(r, threadName);
+ thread.setDaemon(daemon);
+ return thread;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/common/Topology.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/common/Topology.java b/tools/src/main/java/org/apache/kafka/trogdor/common/Topology.java
index 94a5474..d48bbde 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/common/Topology.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/common/Topology.java
@@ -17,12 +17,30 @@
package org.apache.kafka.trogdor.common;
+import java.util.HashSet;
+import java.util.Map;
import java.util.NavigableMap;
+import java.util.Set;
/**
* Defines a cluster topology
*/
public interface Topology {
+ class Util {
+ /**
+ * Get the names of agent nodes in the topology.
+ */
+ public static Set<String> agentNodeNames(Topology topology) {
+ Set<String> set = new HashSet<>();
+ for (Map.Entry<String, Node> entry : topology.nodes().entrySet()) {
+ if (Node.Util.getTrogdorAgentPort(entry.getValue()) > 0) {
+ set.add(entry.getKey());
+ }
+ }
+ return set;
+ }
+ }
+
/**
* Get the node with the given name.
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
index 8f3563b..8c26d8d 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
@@ -22,34 +22,19 @@ import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.utils.Exit;
-import org.apache.kafka.common.utils.KafkaThread;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.utils.Scheduler;
import org.apache.kafka.trogdor.common.Node;
import org.apache.kafka.trogdor.common.Platform;
-import org.apache.kafka.trogdor.fault.DoneState;
-import org.apache.kafka.trogdor.fault.Fault;
-import org.apache.kafka.trogdor.fault.FaultSet;
-import org.apache.kafka.trogdor.fault.FaultSpec;
-import org.apache.kafka.trogdor.fault.SendingState;
-import org.apache.kafka.trogdor.rest.CoordinatorFaultsResponse;
-import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest;
+import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
+import org.apache.kafka.trogdor.rest.CreateTaskRequest;
+import org.apache.kafka.trogdor.rest.CreateTaskResponse;
import org.apache.kafka.trogdor.rest.JsonRestServer;
+import org.apache.kafka.trogdor.rest.StopTaskRequest;
+import org.apache.kafka.trogdor.rest.StopTaskResponse;
+import org.apache.kafka.trogdor.rest.TasksResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
-
import static net.sourceforge.argparse4j.impl.Arguments.store;
/**
@@ -61,39 +46,14 @@ public final class Coordinator {
private static final Logger log = LoggerFactory.getLogger(Coordinator.class);
/**
- * The clock to use for this coordinator.
- */
- private final Time time;
-
- /**
- * The start time in milliseconds.
+ * The start time of the Coordinator in milliseconds.
*/
private final long startTimeMs;
/**
- * The platform.
+ * The task manager.
*/
- private final Platform platform;
-
- /**
- * NodeManager objects for each node in the cluster.
- */
- private final Map<String, NodeManager> nodeManagers;
-
- /**
- * The lock protecting shutdown and faultQueue.
- */
- private final ReentrantLock lock = new ReentrantLock();
-
- /**
- * The condition variable which the coordinator thread waits on.
- */
- private final Condition cond = lock.newCondition();
-
- /**
- * The coordinator runnable.
- */
- private final CoordinatorRunnable runnable;
+ private final TaskManager taskManager;
/**
* The REST server.
@@ -101,83 +61,6 @@ public final class Coordinator {
private final JsonRestServer restServer;
/**
- * The coordinator thread.
- */
- private final KafkaThread thread;
-
- /**
- * True if the server is shutting down.
- */
- private boolean shutdown = false;
-
- /**
- * The set of faults which have been scheduled.
- */
- private final FaultSet pendingFaults = new FaultSet();
-
- /**
- * The set of faults which have been sent to the NodeManagers.
- */
- private final FaultSet processedFaults = new FaultSet();
-
- class CoordinatorRunnable implements Runnable {
- @Override
- public void run() {
- log.info("Starting main service thread.");
- try {
- long nextWakeMs = 0;
- while (true) {
- long now = time.milliseconds();
- List<Fault> toStart = new ArrayList<>();
- lock.lock();
- try {
- if (shutdown) {
- break;
- }
- if (nextWakeMs > now) {
- if (cond.await(nextWakeMs - now, TimeUnit.MILLISECONDS)) {
- log.trace("CoordinatorRunnable woke up early.");
- }
- now = time.milliseconds();
- if (shutdown) {
- break;
- }
- }
- nextWakeMs = now + (60L * 60L * 1000L);
- Iterator<Fault> iter = pendingFaults.iterateByStart();
- while (iter.hasNext()) {
- Fault fault = iter.next();
- if (now < fault.spec().startMs()) {
- nextWakeMs = Math.min(nextWakeMs, fault.spec().startMs());
- break;
- }
- toStart.add(fault);
- iter.remove();
- processedFaults.add(fault);
- }
- } finally {
- lock.unlock();
- }
- for (Fault fault: toStart) {
- startFault(now, fault);
- }
- }
- } catch (Throwable t) {
- log.error("CoordinatorRunnable shutting down with exception", t);
- } finally {
- log.info("CoordinatorRunnable shutting down.");
- restServer.stop();
- for (NodeManager nodeManager : nodeManagers.values()) {
- nodeManager.beginShutdown();
- }
- for (NodeManager nodeManager : nodeManagers.values()) {
- nodeManager.waitForShutdown();
- }
- }
- }
- }
-
- /**
* Create a new Coordinator.
*
* @param platform The platform object to use.
@@ -185,24 +68,11 @@ public final class Coordinator {
* @param restServer The REST server to use.
* @param resource The AgentRestResoure to use.
*/
- public Coordinator(Platform platform, Time time, JsonRestServer restServer,
+ public Coordinator(Platform platform, Scheduler scheduler, JsonRestServer restServer,
CoordinatorRestResource resource) {
- this.platform = platform;
- this.time = time;
- this.startTimeMs = time.milliseconds();
- this.runnable = new CoordinatorRunnable();
+ this.startTimeMs = scheduler.time().milliseconds();
+ this.taskManager = new TaskManager(platform, scheduler);
this.restServer = restServer;
- this.nodeManagers = new HashMap<>();
- for (Node node : platform.topology().nodes().values()) {
- if (Node.Util.getTrogdorAgentPort(node) > 0) {
- this.nodeManagers.put(node.name(), new NodeManager(time, node));
- }
- }
- if (this.nodeManagers.isEmpty()) {
- log.warn("No agent nodes configured.");
- }
- this.thread = new KafkaThread("TrogdorCoordinatorThread", runnable, false);
- this.thread.start();
resource.setCoordinator(this);
}
@@ -210,94 +80,32 @@ public final class Coordinator {
return this.restServer.port();
}
- private void startFault(long now, Fault fault) {
- Set<String> affectedNodes = fault.targetNodes(platform.topology());
- Set<NodeManager> affectedManagers = new HashSet<>();
- Set<String> nonexistentNodes = new HashSet<>();
- Set<String> nodeNames = new HashSet<>();
- for (String affectedNode : affectedNodes) {
- NodeManager nodeManager = nodeManagers.get(affectedNode);
- if (nodeManager == null) {
- nonexistentNodes.add(affectedNode);
- } else {
- affectedManagers.add(nodeManager);
- nodeNames.add(affectedNode);
- }
- }
- if (!nonexistentNodes.isEmpty()) {
- log.warn("Fault {} refers to {} non-existent node(s): {}", fault.id(),
- nonexistentNodes.size(), Utils.join(nonexistentNodes, ", "));
- }
- log.info("Applying fault {} on {} node(s): {}", fault.id(),
- nodeNames.size(), Utils.join(nodeNames, ", "));
- if (nodeNames.isEmpty()) {
- fault.setState(new DoneState(now, ""));
- } else {
- fault.setState(new SendingState(nodeNames));
- }
- for (NodeManager nodeManager : affectedManagers) {
- nodeManager.enqueueFault(fault);
- }
- }
-
- public void beginShutdown() {
- lock.lock();
- try {
- this.shutdown = true;
- cond.signalAll();
- } finally {
- lock.unlock();
- }
+ public CoordinatorStatusResponse status() throws Exception {
+ return new CoordinatorStatusResponse(startTimeMs);
}
- public void waitForShutdown() {
- try {
- this.thread.join();
- } catch (InterruptedException e) {
- log.error("Interrupted while waiting for thread shutdown", e);
- Thread.currentThread().interrupt();
- }
+ public CreateTaskResponse createTask(CreateTaskRequest request) throws Exception {
+ return new CreateTaskResponse(taskManager.createTask(request.id(), request.spec()));
}
- public long startTimeMs() {
- return startTimeMs;
+ public StopTaskResponse stopTask(StopTaskRequest request) throws Exception {
+ return new StopTaskResponse(taskManager.stopTask(request.id()));
}
- public CoordinatorFaultsResponse getFaults() {
- Map<String, CoordinatorFaultsResponse.FaultData> faultData = new TreeMap<>();
- lock.lock();
- try {
- getFaultsImpl(faultData, pendingFaults);
- getFaultsImpl(faultData, processedFaults);
- } finally {
- lock.unlock();
- }
- return new CoordinatorFaultsResponse(faultData);
+ public TasksResponse tasks() throws Exception {
+ return taskManager.tasks();
}
- private void getFaultsImpl(Map<String, CoordinatorFaultsResponse.FaultData> faultData,
- FaultSet faultSet) {
- for (Iterator<Fault> iter = faultSet.iterateByStart();
- iter.hasNext(); ) {
- Fault fault = iter.next();
- CoordinatorFaultsResponse.FaultData data =
- new CoordinatorFaultsResponse.FaultData(fault.spec(), fault.state());
- faultData.put(fault.id(), data);
- }
+ public void beginShutdown(boolean stopAgents) throws Exception {
+ restServer.beginShutdown();
+ taskManager.beginShutdown(stopAgents);
}
- public void createFault(CreateCoordinatorFaultRequest request) throws ClassNotFoundException {
- lock.lock();
- try {
- Fault fault = FaultSpec.Util.createFault(request.id(), request.spec());
- pendingFaults.add(fault);
- cond.signalAll();
- } finally {
- lock.unlock();
- }
+ public void waitForShutdown() throws Exception {
+ restServer.waitForShutdown();
+ taskManager.waitForShutdown();
}
-
public static void main(String[] args) throws Exception {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("trogdor-coordinator")
@@ -336,15 +144,20 @@ public final class Coordinator {
JsonRestServer restServer = new JsonRestServer(
Node.Util.getTrogdorCoordinatorPort(platform.curNode()));
CoordinatorRestResource resource = new CoordinatorRestResource();
- final Coordinator coordinator = new Coordinator(platform, Time.SYSTEM,
+ log.info("Starting coordinator process.");
+ final Coordinator coordinator = new Coordinator(platform, Scheduler.SYSTEM,
restServer, resource);
restServer.start(resource);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
- log.error("Running shutdown hook...");
- coordinator.beginShutdown();
- coordinator.waitForShutdown();
+ log.warn("Running coordinator shutdown hook.");
+ try {
+ coordinator.beginShutdown(false);
+ coordinator.waitForShutdown();
+ } catch (Exception e) {
+ log.error("Got exception while running coordinator shutdown hook.", e);
+ }
}
});
coordinator.waitForShutdown();
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
index 1137d08..821a76b 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
@@ -25,12 +25,15 @@ import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.trogdor.common.JsonUtil;
-import org.apache.kafka.trogdor.rest.CoordinatorFaultsResponse;
import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
-import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest;
+import org.apache.kafka.trogdor.rest.CreateTaskRequest;
+import org.apache.kafka.trogdor.rest.CreateTaskResponse;
import org.apache.kafka.trogdor.rest.Empty;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
+import org.apache.kafka.trogdor.rest.StopTaskRequest;
+import org.apache.kafka.trogdor.rest.StopTaskResponse;
+import org.apache.kafka.trogdor.rest.TasksResponse;
import static net.sourceforge.argparse4j.impl.Arguments.store;
import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
@@ -40,47 +43,64 @@ import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
*/
public class CoordinatorClient {
/**
+ * The maximum number of tries to make.
+ */
+ private final int maxTries;
+
+ /**
* The URL target.
*/
private final String target;
- public CoordinatorClient(String host, int port) {
- this(String.format("%s:%d", host, port));
+ public CoordinatorClient(int maxTries, String host, int port) {
+ this(maxTries, String.format("%s:%d", host, port));
}
- public CoordinatorClient(String target) {
+ public CoordinatorClient(int maxTries, String target) {
+ this.maxTries = maxTries;
this.target = target;
}
+ public int maxTries() {
+ return maxTries;
+ }
+
private String url(String suffix) {
return String.format("http://%s%s", target, suffix);
}
- public CoordinatorStatusResponse getStatus() throws Exception {
+ public CoordinatorStatusResponse status() throws Exception {
HttpResponse<CoordinatorStatusResponse> resp =
JsonRestServer.<CoordinatorStatusResponse>httpRequest(url("/coordinator/status"), "GET",
- null, new TypeReference<CoordinatorStatusResponse>() { });
+ null, new TypeReference<CoordinatorStatusResponse>() { }, maxTries);
return resp.body();
}
- public CoordinatorFaultsResponse getFaults() throws Exception {
- HttpResponse<CoordinatorFaultsResponse> resp =
- JsonRestServer.<CoordinatorFaultsResponse>httpRequest(url("/coordinator/faults"), "GET",
- null, new TypeReference<CoordinatorFaultsResponse>() { });
+ public CreateTaskResponse createTask(CreateTaskRequest request) throws Exception {
+ HttpResponse<CreateTaskResponse> resp =
+ JsonRestServer.<CreateTaskResponse>httpRequest(url("/coordinator/task/create"), "POST",
+ request, new TypeReference<CreateTaskResponse>() { }, maxTries);
return resp.body();
}
- public void putFault(CreateCoordinatorFaultRequest request) throws Exception {
- HttpResponse<CreateCoordinatorFaultRequest> resp =
- JsonRestServer.<CreateCoordinatorFaultRequest>httpRequest(url("/coordinator/fault"), "PUT",
- request, new TypeReference<CreateCoordinatorFaultRequest>() { });
- resp.body();
+ public StopTaskResponse stopTask(StopTaskRequest request) throws Exception {
+ HttpResponse<StopTaskResponse> resp =
+ JsonRestServer.<StopTaskResponse>httpRequest(url("/coordinator/task/stop"), "PUT",
+ request, new TypeReference<StopTaskResponse>() { }, maxTries);
+ return resp.body();
+ }
+
+ public TasksResponse tasks() throws Exception {
+ HttpResponse<TasksResponse> resp =
+ JsonRestServer.<TasksResponse>httpRequest(url("/coordinator/tasks"), "GET",
+ null, new TypeReference<TasksResponse>() { }, maxTries);
+ return resp.body();
}
public void shutdown() throws Exception {
HttpResponse<Empty> resp =
JsonRestServer.<Empty>httpRequest(url("/coordinator/shutdown"), "PUT",
- null, new TypeReference<Empty>() { });
+ null, new TypeReference<Empty>() { }, maxTries);
resp.body();
}
@@ -102,17 +122,23 @@ public class CoordinatorClient {
.type(Boolean.class)
.dest("status")
.help("Get coordinator status.");
- actions.addArgument("--get-faults")
+ actions.addArgument("--show-tasks")
.action(storeTrue())
.type(Boolean.class)
- .dest("get_faults")
- .help("Get coordinator faults.");
- actions.addArgument("--create-fault")
+ .dest("show_tasks")
+ .help("Show coordinator tasks.");
+ actions.addArgument("--create-task")
+ .action(store())
+ .type(String.class)
+ .dest("create_task")
+ .metavar("TASK_SPEC_JSON")
+ .help("Create a new task from a task spec.");
+ actions.addArgument("--stop-task")
.action(store())
.type(String.class)
- .dest("create_fault")
- .metavar("FAULT_JSON")
- .help("Create a new fault.");
+ .dest("stop_task")
+ .metavar("TASK_ID")
+ .help("Stop a task.");
actions.addArgument("--shutdown")
.action(storeTrue())
.type(Boolean.class)
@@ -132,17 +158,20 @@ public class CoordinatorClient {
}
}
String target = res.getString("target");
- CoordinatorClient client = new CoordinatorClient(target);
+ CoordinatorClient client = new CoordinatorClient(3, target);
if (res.getBoolean("status")) {
System.out.println("Got coordinator status: " +
- JsonUtil.toPrettyJsonString(client.getStatus()));
- } else if (res.getBoolean("get_faults")) {
- System.out.println("Got coordinator faults: " +
- JsonUtil.toPrettyJsonString(client.getFaults()));
- } else if (res.getString("create_fault") != null) {
- client.putFault(JsonUtil.JSON_SERDE.readValue(res.getString("create_fault"),
- CreateCoordinatorFaultRequest.class));
- System.out.println("Created fault.");
+ JsonUtil.toPrettyJsonString(client.status()));
+ } else if (res.getBoolean("show_tasks")) {
+ System.out.println("Got coordinator tasks: " +
+ JsonUtil.toPrettyJsonString(client.tasks()));
+ } else if (res.getString("create_task") != null) {
+ client.createTask(JsonUtil.JSON_SERDE.readValue(res.getString("create_task"),
+ CreateTaskRequest.class));
+ System.out.println("Created task.");
+ } else if (res.getString("stop_task") != null) {
+ client.stopTask(new StopTaskRequest(res.getString("stop_task")));
+ System.out.println("Created task.");
} else if (res.getBoolean("shutdown")) {
client.shutdown();
System.out.println("Sent shutdown request.");
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
index 60357b8..7775dd0 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
@@ -16,14 +16,19 @@
*/
package org.apache.kafka.trogdor.coordinator;
-import org.apache.kafka.trogdor.rest.CoordinatorFaultsResponse;
+import org.apache.kafka.trogdor.rest.CoordinatorShutdownRequest;
import org.apache.kafka.trogdor.rest.CoordinatorStatusResponse;
-import org.apache.kafka.trogdor.rest.CreateCoordinatorFaultRequest;
+import org.apache.kafka.trogdor.rest.CreateTaskRequest;
+import org.apache.kafka.trogdor.rest.CreateTaskResponse;
import org.apache.kafka.trogdor.rest.Empty;
+import org.apache.kafka.trogdor.rest.StopTaskRequest;
+import org.apache.kafka.trogdor.rest.StopTaskResponse;
+import org.apache.kafka.trogdor.rest.TasksResponse;
import javax.servlet.ServletContext;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
+import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
@@ -46,27 +51,32 @@ public class CoordinatorRestResource {
@GET
@Path("/status")
- public CoordinatorStatusResponse getStatus() throws Throwable {
- return new CoordinatorStatusResponse(coordinator().startTimeMs());
+ public CoordinatorStatusResponse status() throws Throwable {
+ return coordinator().status();
}
- @GET
- @Path("/faults")
- public CoordinatorFaultsResponse getCoordinatorFaults() throws Throwable {
- return coordinator().getFaults();
+ @POST
+ @Path("/task/create")
+ public CreateTaskResponse createTask(CreateTaskRequest request) throws Throwable {
+ return coordinator().createTask(request);
}
@PUT
- @Path("/fault")
- public Empty putCoordinatorFault(CreateCoordinatorFaultRequest request) throws Throwable {
- coordinator().createFault(request);
- return Empty.INSTANCE;
+ @Path("/task/stop")
+ public StopTaskResponse stopTask(StopTaskRequest request) throws Throwable {
+ return coordinator().stopTask(request);
+ }
+
+ @GET
+ @Path("/tasks")
+ public TasksResponse tasks() throws Throwable {
+ return coordinator().tasks();
}
@PUT
@Path("/shutdown")
- public Empty shutdown() throws Throwable {
- coordinator().beginShutdown();
+ public Empty beginShutdown(CoordinatorShutdownRequest request) throws Throwable {
+ coordinator().beginShutdown(request.stopAgents());
return Empty.INSTANCE;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
index ee71190..aea9617 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
@@ -15,254 +15,315 @@
* limitations under the License.
*/
+/*
+ * So, when a task comes in, it happens via createTask (the RPC backend).
+ * This starts a CreateTask on the main state change thread, and waits for it.
+ * That task checks the main task hash map, and returns back the existing task spec
+ * if there is something there. If there is nothing there, it creates
+ * something new, and returns null.
+ * It also schedules a RunTask some time in the future on the main state change thread.
+ * We save the future from this in case we need to cancel it later, in a StopTask.
+ * If we can't create the TaskController for the task, we transition to DONE with an
+ * appropriate error message.
+ *
+ * RunTask actually starts the task which was created earlier. This could
+ * happen an arbitrary amount of time after task creation (it is based on the
+ * task spec). RunTask must operate only on PENDING tasks... if the task has been
+ * stopped, then we have nothing to do here.
+ * RunTask asks the TaskController for a list of all the names of nodes
+ * affected by this task.
+ * If this list contains nodes we don't know about, or zero nodes, we
+ * transition directly to DONE state with an appropriate error set.
+ * RunTask schedules CreateWorker Callables on all the affected worker nodes.
+ * These callables run in the context of the relevant NodeManager.
+ *
+ * CreateWorker calls the RPC of the same name for the agent.
+ * There is some complexity here due to retries.
+ */
+
package org.apache.kafka.trogdor.coordinator;
-import org.apache.kafka.common.utils.KafkaThread;
-import org.apache.kafka.common.utils.Time;
import org.apache.kafka.trogdor.agent.AgentClient;
import org.apache.kafka.trogdor.common.Node;
-import org.apache.kafka.trogdor.common.Platform;
-import org.apache.kafka.trogdor.fault.DoneState;
-import org.apache.kafka.trogdor.fault.Fault;
-import org.apache.kafka.trogdor.fault.SendingState;
+import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
-import org.apache.kafka.trogdor.rest.CreateAgentFaultRequest;
+import org.apache.kafka.trogdor.rest.CreateWorkerRequest;
+import org.apache.kafka.trogdor.rest.StopWorkerRequest;
+import org.apache.kafka.trogdor.rest.WorkerDone;
+import org.apache.kafka.trogdor.rest.WorkerReceiving;
+import org.apache.kafka.trogdor.rest.WorkerRunning;
+import org.apache.kafka.trogdor.rest.WorkerStarting;
+import org.apache.kafka.trogdor.rest.WorkerState;
+import org.apache.kafka.trogdor.task.TaskSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
+import java.net.ConnectException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-class NodeManager {
+/**
+ * The NodeManager handles communicating with a specific agent node.
+ * Each NodeManager has its own ExecutorService which runs in a dedicated thread.
+ */
+public final class NodeManager {
private static final Logger log = LoggerFactory.getLogger(NodeManager.class);
/**
- * The Time object used to fetch the current time.
+ * The normal amount of seconds between heartbeats sent to the agent.
*/
- private final Time time;
+ private static final long HEARTBEAT_DELAY_MS = 1000L;
- /**
- * The node which is being managed.
- */
- private final Node node;
+ class ManagedWorker {
+ private final String id;
+ private final TaskSpec spec;
+ private boolean shouldRun;
+ private WorkerState state;
- /**
- * The client for the node being managed.
- */
- private final AgentClient client;
+ ManagedWorker(String id, TaskSpec spec, boolean shouldRun, WorkerState state) {
+ this.id = id;
+ this.spec = spec;
+ this.shouldRun = shouldRun;
+ this.state = state;
+ }
- /**
- * The maximum amount of time to go without contacting the node.
- */
- private final long heartbeatMs;
+ void tryCreate() {
+ try {
+ client.createWorker(new CreateWorkerRequest(id, spec));
+ } catch (Throwable e) {
+ log.error("{}: error creating worker {}.", node.name(), id, e);
+ }
+ }
- /**
- * True if the NodeManager is shutting down. Protected by the queueLock.
- */
- private boolean shutdown = false;
+ void tryStop() {
+ try {
+ client.stopWorker(new StopWorkerRequest(id));
+ } catch (Throwable e) {
+ log.error("{}: error stopping worker {}.", node.name(), id, e);
+ }
+ }
+ }
/**
- * The Node Manager runnable.
+ * The node which we are managing.
*/
- private final NodeManagerRunnable runnable;
+ private final Node node;
/**
- * The Node Manager thread.
+ * The task manager.
*/
- private final KafkaThread thread;
+ private final TaskManager taskManager;
/**
- * The lock protecting the NodeManager fields.
+ * A client for the Node's Agent.
*/
- private final Lock lock = new ReentrantLock();
+ private final AgentClient client;
/**
- * The condition variable used to wake the thread when it is waiting for a
- * queue or shutdown change.
+ * Maps task IDs to worker structures.
*/
- private final Condition cond = lock.newCondition();
+ private final Map<String, ManagedWorker> workers;
/**
- * A queue of faults which should be sent to this node. Protected by the lock.
+ * An executor service which manages the thread dedicated to this node.
*/
- private final List<Fault> faultQueue = new ArrayList<>();
+ private final ScheduledExecutorService executor;
/**
- * The last time we successfully contacted the node. Protected by the lock.
+ * The heartbeat runnable.
*/
- private long lastContactMs = 0;
+ private final NodeHeartbeat heartbeat;
/**
- * The current status of this node.
+ * A future which can be used to cancel the periodic hearbeat task.
*/
- public static class NodeStatus {
- private final String nodeName;
- private final long lastContactMs;
+ private ScheduledFuture<?> heartbeatFuture;
- NodeStatus(String nodeName, long lastContactMs) {
- this.nodeName = nodeName;
- this.lastContactMs = lastContactMs;
- }
-
- public String nodeName() {
- return nodeName;
- }
+ NodeManager(Node node, TaskManager taskManager) {
+ this.node = node;
+ this.taskManager = taskManager;
+ this.client = new AgentClient(1, node.hostname(), Node.Util.getTrogdorAgentPort(node));
+ this.workers = new HashMap<>();
+ this.executor = Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.createThreadFactory("NodeManager(" + node.name() + ")",
+ false));
+ this.heartbeat = new NodeHeartbeat();
+ rescheduleNextHeartbeat(HEARTBEAT_DELAY_MS);
+ }
- public long lastContactMs() {
- return lastContactMs;
+ /**
+ * Reschedule the heartbeat runnable.
+ *
+ * @param initialDelayMs The initial delay to use.
+ */
+ void rescheduleNextHeartbeat(long initialDelayMs) {
+ if (this.heartbeatFuture != null) {
+ this.heartbeatFuture.cancel(false);
}
+ this.heartbeatFuture = this.executor.scheduleAtFixedRate(heartbeat,
+ initialDelayMs, HEARTBEAT_DELAY_MS, TimeUnit.MILLISECONDS);
}
- class NodeManagerRunnable implements Runnable {
+ /**
+ * The heartbeat runnable.
+ */
+ class NodeHeartbeat implements Runnable {
@Override
public void run() {
+ rescheduleNextHeartbeat(HEARTBEAT_DELAY_MS);
try {
- Fault fault = null;
- long lastCommAttemptMs = 0;
- while (true) {
- long now = time.milliseconds();
- if (fault != null) {
- lastCommAttemptMs = now;
- if (sendFault(now, fault)) {
- fault = null;
+ AgentStatusResponse agentStatus = null;
+ try {
+ agentStatus = client.status();
+ } catch (ConnectException e) {
+ log.error("{}: failed to get agent status: ConnectException {}", node.name(), e.getMessage());
+ return;
+ } catch (Exception e) {
+ log.error("{}: failed to get agent status", node.name(), e);
+ // TODO: eventually think about putting tasks into a bad state as a result of
+ // agents going down?
+ return;
+ }
+ // Identify workers which we think should be running, but which do not appear
+ // in the agent's response. We need to send startWorker requests for these.
+ for (Map.Entry<String, ManagedWorker> entry : workers.entrySet()) {
+ String id = entry.getKey();
+ if (!agentStatus.workers().containsKey(id)) {
+ ManagedWorker worker = entry.getValue();
+ if (worker.shouldRun) {
+ worker.tryCreate();
}
}
- long nextCommAttemptMs = lastCommAttemptMs + heartbeatMs;
- if (now < nextCommAttemptMs) {
- lastCommAttemptMs = now;
- sendHeartbeat(now);
+ }
+ // Identify tasks which are running, but which we don't know about.
+ // Add these to the NodeManager as tasks that should not be running.
+ for (Map.Entry<String, WorkerState> entry : agentStatus.workers().entrySet()) {
+ String id = entry.getKey();
+ WorkerState state = entry.getValue();
+ if (!workers.containsKey(id)) {
+ log.warn("{}: scheduling unknown worker {} for stopping.", node.name(), id);
+ workers.put(id, new ManagedWorker(id, state.spec(), false, state));
}
- long waitMs = Math.max(0L, nextCommAttemptMs - now);
- lock.lock();
- try {
- if (shutdown) {
- return;
- }
- try {
- cond.await(waitMs, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- log.info("{}: NodeManagerRunnable got InterruptedException", node.name());
- Thread.currentThread().interrupt();
+ }
+ // Handle workers which need to be stopped. Handle workers which have newly completed.
+ for (Map.Entry<String, WorkerState> entry : agentStatus.workers().entrySet()) {
+ String id = entry.getKey();
+ WorkerState state = entry.getValue();
+ ManagedWorker worker = workers.get(id);
+ if (state instanceof WorkerStarting || state instanceof WorkerRunning) {
+ if (!worker.shouldRun) {
+ worker.tryStop();
}
- if (fault == null) {
- if (!faultQueue.isEmpty()) {
- fault = faultQueue.remove(0);
+ } else if (state instanceof WorkerDone) {
+ if (!(worker.state instanceof WorkerDone)) {
+ String error = ((WorkerDone) state).error();
+ if (error.isEmpty()) {
+ log.warn("{}: Worker {} finished with no error.", node.name(), id);
+ } else {
+ log.warn("{}: Worker {} finished with error '{}'", node.name(), id, error);
}
+ taskManager.handleWorkerCompletion(node.name(), worker.id, error);
}
- } finally {
- lock.unlock();
}
+ worker.state = state;
}
} catch (Throwable e) {
- log.warn("{}: exiting NodeManagerRunnable with exception", node.name(), e);
- } finally {
+ log.error("{}: Unhandled exception in NodeHeartbeatRunnable", node.name(), e);
}
}
}
- NodeManager(Time time, Node node) {
- this.time = time;
- this.node = node;
- this.client = new AgentClient(node.hostname(), Node.Util.getTrogdorAgentPort(node));
- this.heartbeatMs = Node.Util.getIntConfig(node,
- Platform.Config.TROGDOR_COORDINATOR_HEARTBEAT_MS,
- Platform.Config.TROGDOR_COORDINATOR_HEARTBEAT_MS_DEFAULT);
- this.runnable = new NodeManagerRunnable();
- this.thread = new KafkaThread("NodeManagerThread(" + node.name() + ")", runnable, false);
- this.thread.start();
+ /**
+ * Create a new worker.
+ *
+ * @param id The new worker id.
+ * @param spec The task specification to use with the new worker.
+ */
+ public void createWorker(String id, TaskSpec spec) {
+ executor.submit(new CreateWorker(id, spec));
}
- private boolean sendFault(long now, Fault fault) {
- try {
- client.putFault(new CreateAgentFaultRequest(fault.id(), fault.spec()));
- } catch (Exception e) {
- log.warn("{}: error sending fault to {}.", node.name(), client.target(), e);
- return false;
- }
- lock.lock();
- try {
- lastContactMs = now;
- } finally {
- lock.unlock();
- }
- SendingState state = (SendingState) fault.state();
- if (state.completeSend(node.name())) {
- fault.setState(new DoneState(now, ""));
- }
- return true;
- }
+ /**
+ * Starts a worker.
+ */
+ class CreateWorker implements Callable<Void> {
+ private final String id;
+ private final TaskSpec spec;
- private void sendHeartbeat(long now) {
- AgentStatusResponse status = null;
- try {
- status = client.getStatus();
- } catch (Exception e) {
- log.warn("{}: error sending heartbeat to {}.", node.name(), client.target(), e);
- return;
+ CreateWorker(String id, TaskSpec spec) {
+ this.id = id;
+ this.spec = spec;
}
- lock.lock();
- try {
- lastContactMs = now;
- } finally {
- lock.unlock();
- }
- log.debug("{}: got heartbeat status {}.", node.name(), status);
- }
- public void beginShutdown() {
- lock.lock();
- try {
- if (shutdown)
- return;
- log.trace("{}: beginning shutdown.", node.name());
- shutdown = true;
- cond.signalAll();
- } finally {
- lock.unlock();
- }
- }
-
- public void waitForShutdown() {
- log.trace("waiting for NodeManager({}) shutdown.", node.name());
- try {
- thread.join();
- } catch (InterruptedException e) {
- log.error("{}: Interrupted while waiting for thread shutdown", node.name(), e);
- Thread.currentThread().interrupt();
+ @Override
+ public Void call() throws Exception {
+ ManagedWorker worker = workers.get(id);
+ if (worker != null) {
+ log.error("{}: there is already a worker for task {}.", node.name(), id);
+ return null;
+ }
+ log.info("{}: scheduling worker {} to start.", node.name(), id);
+ workers.put(id, new ManagedWorker(id, spec, true, new WorkerReceiving(spec)));
+ rescheduleNextHeartbeat(0);
+ return null;
}
}
/**
- * Get the current status of this node.
+ * Stop a worker.
*
- * @return The node status.
+ * @param id The id of the worker to stop.
*/
- public NodeStatus status() {
- lock.lock();
- try {
- return new NodeStatus(node.name(), lastContactMs);
- } finally {
- lock.unlock();
- }
+ public void stopWorker(String id) {
+ executor.submit(new StopWorker(id));
}
/**
- * Enqueue a new fault.
- *
- * @param fault The fault to enqueue.
+ * Stops a worker.
*/
- public void enqueueFault(Fault fault) {
- lock.lock();
- try {
- log.trace("{}: added {} to fault queue.", node.name(), fault);
- faultQueue.add(fault);
- cond.signalAll();
- } finally {
- lock.unlock();
+ class StopWorker implements Callable<Void> {
+ private final String id;
+
+ StopWorker(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ ManagedWorker worker = workers.get(id);
+ if (worker == null) {
+ log.error("{}: can't stop non-existent worker {}.", node.name(), id);
+ return null;
+ }
+ if (!worker.shouldRun) {
+ log.error("{}: The worker for task {} is already scheduled to stop.",
+ node.name(), id);
+ return null;
+ }
+ log.info("{}: scheduling worker {} on {} to stop.", node.name(), id);
+ worker.shouldRun = false;
+ rescheduleNextHeartbeat(0);
+ return null;
}
}
+
+ public void beginShutdown(boolean stopNode) {
+ executor.shutdownNow();
+ if (stopNode) {
+ try {
+ client.invokeShutdown();
+ } catch (Exception e) {
+ log.error("{}: Failed to send shutdown request", node.name(), e);
+ }
+ }
+ }
+
+ public void waitForShutdown() throws InterruptedException {
+ executor.awaitTermination(1, TimeUnit.DAYS);
+ }
};
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
new file mode 100644
index 0000000..547c9da
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.coordinator;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Scheduler;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.trogdor.common.Node;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.ThreadUtils;
+import org.apache.kafka.trogdor.rest.TaskDone;
+import org.apache.kafka.trogdor.rest.TaskPending;
+import org.apache.kafka.trogdor.rest.TaskRunning;
+import org.apache.kafka.trogdor.rest.TaskState;
+import org.apache.kafka.trogdor.rest.TaskStopping;
+import org.apache.kafka.trogdor.rest.TasksResponse;
+import org.apache.kafka.trogdor.task.TaskController;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * The TaskManager is responsible for managing tasks inside the Trogdor coordinator.
+ *
+ * The task manager has a single thread, managed by the executor. We start, stop,
+ * and handle state changes to tasks by adding requests to the executor queue.
+ * Because the executor is single threaded, no locks are needed when accessing
+ * TaskManager data structures.
+ *
+ * The TaskManager maintains a state machine for each task. Tasks begin in the
+ * PENDING state, waiting for their designated start time to arrive.
+ * When their time arrives, they transition to the RUNNING state. In this state,
+ * the NodeManager will start them, and monitor them.
+ *
+ * The TaskManager does not handle communication with the agents. This is handled
+ * by the NodeManagers. There is one NodeManager per node being managed.
+ * See {org.apache.kafka.trogdor.coordinator.NodeManager} for details.
+ */
+public final class TaskManager {
+ private static final Logger log = LoggerFactory.getLogger(TaskManager.class);
+
+ /**
+ * The platform.
+ */
+ private final Platform platform;
+
+ /**
+ * The scheduler to use for this coordinator.
+ */
+ private final Scheduler scheduler;
+
+ /**
+ * The clock to use for this coordinator.
+ */
+ private final Time time;
+
+ /**
+ * A map of task IDs to Task objects.
+ */
+ private final Map<String, ManagedTask> tasks;
+
+ /**
+ * The executor used for handling Task state changes.
+ */
+ private final ScheduledExecutorService executor;
+
+ /**
+ * Maps node names to node managers.
+ */
+ private final Map<String, NodeManager> nodeManagers;
+
+ /**
+ * True if the TaskManager is shut down.
+ */
+ private AtomicBoolean shutdown = new AtomicBoolean(false);
+
+ TaskManager(Platform platform, Scheduler scheduler) {
+ this.platform = platform;
+ this.scheduler = scheduler;
+ this.time = scheduler.time();
+ this.tasks = new HashMap<>();
+ this.executor = Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.createThreadFactory("TaskManagerStateThread", false));
+ this.nodeManagers = new HashMap<>();
+ for (Node node : platform.topology().nodes().values()) {
+ if (Node.Util.getTrogdorAgentPort(node) > 0) {
+ this.nodeManagers.put(node.name(), new NodeManager(node, this));
+ }
+ }
+ }
+
+ enum ManagedTaskState {
+ PENDING,
+ RUNNING,
+ STOPPING,
+ DONE;
+ }
+
+ class ManagedTask {
+ /**
+ * The task id.
+ */
+ final private String id;
+
+ /**
+ * The task specification.
+ */
+ final private TaskSpec spec;
+
+ /**
+ * The task controller.
+ */
+ final private TaskController controller;
+
+ /**
+ * The task state.
+ */
+ private ManagedTaskState state;
+
+ /**
+ * The time when the task was started, or -1 if the task has not been started.
+ */
+ private long startedMs = -1;
+
+ /**
+ * The time when the task was finished, or -1 if the task has not been finished.
+ */
+ private long doneMs = -1;
+
+ /**
+ * True if the task was cancelled by a stop request.
+ */
+ boolean cancelled = false;
+
+ /**
+ * If there is a task start scheduled, this is a future which can
+ * be used to cancel it.
+ */
+ private Future<?> startFuture = null;
+
+ /**
+ * The name of the worker nodes involved with this task.
+ * Null if the task is not running.
+ */
+ private Set<String> workers = null;
+
+ /**
+ * The names of the worker nodes which are still running this task.
+ * Null if the task is not running.
+ */
+ private Set<String> activeWorkers = null;
+
+ /**
+ * If this is non-empty, a message describing how this task failed.
+ */
+ private String error = "";
+
+ ManagedTask(String id, TaskSpec spec, TaskController controller, ManagedTaskState state) {
+ this.id = id;
+ this.spec = spec;
+ this.controller = controller;
+ this.state = state;
+ }
+
+ void clearStartFuture() {
+ if (startFuture != null) {
+ startFuture.cancel(false);
+ startFuture = null;
+ }
+ }
+
+ long startDelayMs(long now) {
+ if (now > spec.startMs()) {
+ return 0;
+ }
+ return spec.startMs() - now;
+ }
+
+ TreeSet<String> findNodeNames() {
+ Set<String> nodeNames = controller.targetNodes(platform.topology());
+ TreeSet<String> validNodeNames = new TreeSet<>();
+ TreeSet<String> nonExistentNodeNames = new TreeSet<>();
+ for (String nodeName : nodeNames) {
+ if (nodeManagers.containsKey(nodeName)) {
+ validNodeNames.add(nodeName);
+ } else {
+ nonExistentNodeNames.add(nodeName);
+ }
+ }
+ if (!nonExistentNodeNames.isEmpty()) {
+ throw new KafkaException("Unknown node names: " +
+ Utils.join(nonExistentNodeNames, ", "));
+ }
+ if (validNodeNames.isEmpty()) {
+ throw new KafkaException("No node names specified.");
+ }
+ return validNodeNames;
+ }
+
+ void maybeSetError(String newError) {
+ if (error.isEmpty()) {
+ error = newError;
+ }
+ }
+
+ TaskState taskState() {
+ switch (state) {
+ case PENDING:
+ return new TaskPending(spec);
+ case RUNNING:
+ return new TaskRunning(spec, startedMs);
+ case STOPPING:
+ return new TaskStopping(spec, startedMs);
+ case DONE:
+ return new TaskDone(spec, startedMs, doneMs, error, cancelled);
+ }
+ throw new RuntimeException("unreachable");
+ }
+ }
+
+ /**
+ * Create a task.
+ *
+ * @param id The ID of the task to create.
+ * @param spec The specification of the task to create.
+ *
+ * @return The specification of the task with the given ID.
+ * Note that if there was already a task with the given ID,
+ * this may be different from the specification that was
+ * requested.
+ */
+ public TaskSpec createTask(final String id, TaskSpec spec)
+ throws ExecutionException, InterruptedException {
+ final TaskSpec existingSpec = executor.submit(new CreateTask(id, spec)).get();
+ if (existingSpec != null) {
+ log.info("Ignoring request to create task {}, because there is already " +
+ "a task with that id.", id);
+ return existingSpec;
+ }
+ return spec;
+ }
+
+ /**
+ * Handles a request to create a new task. Processed by the state change thread.
+ */
+ class CreateTask implements Callable<TaskSpec> {
+ private final String id;
+ private final TaskSpec spec;
+
+ CreateTask(String id, TaskSpec spec) {
+ this.id = id;
+ this.spec = spec;
+ }
+
+ @Override
+ public TaskSpec call() throws Exception {
+ ManagedTask task = tasks.get(id);
+ if (task != null) {
+ log.info("Task ID {} is already in use.", id);
+ return task.spec;
+ }
+ TaskController controller = null;
+ String failure = null;
+ try {
+ controller = spec.newController(id);
+ } catch (Throwable t) {
+ failure = "Failed to create TaskController: " + t.getMessage();
+ }
+ if (failure != null) {
+ log.info("Failed to create a new task {} with spec {}: {}",
+ id, spec, failure);
+ task = new ManagedTask(id, spec, null, ManagedTaskState.DONE);
+ task.doneMs = time.milliseconds();
+ task.maybeSetError(failure);
+ tasks.put(id, task);
+ return null;
+ }
+ task = new ManagedTask(id, spec, controller, ManagedTaskState.PENDING);
+ tasks.put(id, task);
+ long delayMs = task.startDelayMs(time.milliseconds());
+ task.startFuture = scheduler.schedule(executor, new RunTask(task), delayMs);
+ log.info("Created a new task {} with spec {}, scheduled to start {} ms from now.",
+ id, spec, delayMs);
+ return null;
+ }
+ }
+
+ /**
+ * Handles starting a task. Processed by the state change thread.
+ */
+ class RunTask implements Callable<Void> {
+ private final ManagedTask task;
+
+ RunTask(ManagedTask task) {
+ this.task = task;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ task.clearStartFuture();
+ if (task.state != ManagedTaskState.PENDING) {
+ log.info("Can't start task {}, because it is already in state {}.",
+ task.id, task.state);
+ return null;
+ }
+ TreeSet<String> nodeNames;
+ try {
+ nodeNames = task.findNodeNames();
+ } catch (Exception e) {
+ log.error("Unable to find nodes for task {}", task.id, e);
+ task.doneMs = time.milliseconds();
+ task.state = ManagedTaskState.DONE;
+ task.maybeSetError("Unable to find nodes for task: " + e.getMessage());
+ return null;
+ }
+ log.info("Running task {} on node(s): {}", task.id, Utils.join(nodeNames, ", "));
+ task.state = ManagedTaskState.RUNNING;
+ task.startedMs = time.milliseconds();
+ task.workers = nodeNames;
+ task.activeWorkers = new HashSet<>();
+ for (String workerName : task.workers) {
+ task.activeWorkers.add(workerName);
+ nodeManagers.get(workerName).createWorker(task.id, task.spec);
+ }
+ return null;
+ }
+ }
+
+ /**
+ * Stop a task.
+ *
+ * @param id The ID of the task to stop.
+ * @return The specification of the task which was stopped, or null if there
+ * was no task found with the given ID.
+ */
+ public TaskSpec stopTask(final String id) throws ExecutionException, InterruptedException {
+ final TaskSpec spec = executor.submit(new CancelTask(id)).get();
+ return spec;
+ }
+
+ /**
+ * Handles cancelling a task. Processed by the state change thread.
+ */
+ class CancelTask implements Callable<TaskSpec> {
+ private final String id;
+
+ CancelTask(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public TaskSpec call() throws Exception {
+ ManagedTask task = tasks.get(id);
+ if (task == null) {
+ log.info("Can't cancel non-existent task {}.", id);
+ return null;
+ }
+ switch (task.state) {
+ case PENDING:
+ task.cancelled = true;
+ task.clearStartFuture();
+ task.doneMs = time.milliseconds();
+ task.state = ManagedTaskState.DONE;
+ log.info("Stopped pending task {}.", id);
+ break;
+ case RUNNING:
+ task.cancelled = true;
+ if (task.activeWorkers.size() == 0) {
+ log.info("Task {} is now complete with error: {}", id, task.error);
+ task.doneMs = time.milliseconds();
+ task.state = ManagedTaskState.DONE;
+ } else {
+ for (String workerName : task.activeWorkers) {
+ nodeManagers.get(workerName).stopWorker(id);
+ }
+ log.info("Cancelling task {} on worker(s): {}", id, Utils.join(task.activeWorkers, ", "));
+ task.state = ManagedTaskState.STOPPING;
+ }
+ break;
+ case STOPPING:
+ log.info("Can't cancel task {} because it is already stopping.", id);
+ break;
+ case DONE:
+ log.info("Can't cancel task {} because it is already done.", id);
+ break;
+ }
+ return task.spec;
+ }
+ }
+
+ /**
+ * A callback NodeManager makes to indicate that a worker has completed.
+ * The task will transition to DONE once all workers are done.
+ *
+ * @param nodeName The node name.
+ * @param id The worker name.
+ * @param error An empty string if there is no error, or an error string.
+ */
+ public void handleWorkerCompletion(String nodeName, String id, String error) {
+ executor.submit(new HandleWorkerCompletion(nodeName, id, error));
+ }
+
+ class HandleWorkerCompletion implements Callable<Void> {
+ private final String nodeName;
+ private final String id;
+ private final String error;
+
+ HandleWorkerCompletion(String nodeName, String id, String error) {
+ this.nodeName = nodeName;
+ this.id = id;
+ this.error = error;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ ManagedTask task = tasks.get(id);
+ if (task == null) {
+ log.error("Can't handle completion of unknown worker {} on node {}",
+ id, nodeName);
+ return null;
+ }
+ if ((task.state == ManagedTaskState.PENDING) || (task.state == ManagedTaskState.DONE)) {
+ log.error("Task {} got unexpected worker completion from {} while " +
+ "in {} state.", id, nodeName, task.state);
+ return null;
+ }
+ boolean broadcastStop = false;
+ if (task.state == ManagedTaskState.RUNNING) {
+ task.state = ManagedTaskState.STOPPING;
+ broadcastStop = true;
+ }
+ task.maybeSetError(error);
+ task.activeWorkers.remove(nodeName);
+ if (task.activeWorkers.size() == 0) {
+ task.doneMs = time.milliseconds();
+ task.state = ManagedTaskState.DONE;
+ log.info("Task {} is now complete on {} with error: {}", id,
+ Utils.join(task.workers, ", "),
+ task.error.isEmpty() ? "(none)" : task.error);
+ } else if (broadcastStop) {
+ log.info("Node {} stopped. Stopping task {} on worker(s): {}",
+ id, Utils.join(task.activeWorkers, ", "));
+ for (String workerName : task.activeWorkers) {
+ nodeManagers.get(workerName).stopWorker(id);
+ }
+ }
+ return null;
+ }
+ }
+
+ /**
+ * Get information about the tasks being managed.
+ */
+ public TasksResponse tasks() throws ExecutionException, InterruptedException {
+ return executor.submit(new GetTasksResponse()).get();
+ }
+
+ class GetTasksResponse implements Callable<TasksResponse> {
+ @Override
+ public TasksResponse call() throws Exception {
+ TreeMap<String, TaskState> states = new TreeMap<>();
+ for (ManagedTask task : tasks.values()) {
+ states.put(task.id, task.taskState());
+ }
+ return new TasksResponse(states);
+ }
+ }
+
+ /**
+ * Initiate shutdown, but do not wait for it to complete.
+ */
+ public void beginShutdown(boolean stopAgents) throws ExecutionException, InterruptedException {
+ if (shutdown.compareAndSet(false, true)) {
+ executor.submit(new Shutdown(stopAgents));
+ }
+ }
+
+ /**
+ * Wait for shutdown to complete. May be called prior to beginShutdown.
+ */
+ public void waitForShutdown() throws ExecutionException, InterruptedException {
+ while (!executor.awaitTermination(1, TimeUnit.DAYS)) { }
+ }
+
+ class Shutdown implements Callable<Void> {
+ private final boolean stopAgents;
+
+ Shutdown(boolean stopAgents) {
+ this.stopAgents = stopAgents;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ log.info("Shutting down TaskManager{}.", stopAgents ? " and agents" : "");
+ for (NodeManager nodeManager : nodeManagers.values()) {
+ nodeManager.beginShutdown(stopAgents);
+ }
+ for (NodeManager nodeManager : nodeManagers.values()) {
+ nodeManager.waitForShutdown();
+ }
+ executor.shutdown();
+ return null;
+ }
+ }
+};
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFault.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFault.java
deleted file mode 100644
index 2d63b82..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFault.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import org.apache.kafka.trogdor.common.JsonUtil;
-import org.apache.kafka.trogdor.common.Platform;
-import org.apache.kafka.trogdor.common.Topology;
-
-import java.util.Objects;
-import java.util.Set;
-
-public abstract class AbstractFault implements Fault {
- private final String id;
- private final FaultSpec spec;
- private FaultState state;
-
- public AbstractFault(String id, FaultSpec spec) {
- this.id = id;
- this.spec = spec;
- this.state = new PendingState();
- }
-
- @Override
- public final String id() {
- return id;
- }
-
- @Override
- public final FaultSpec spec() {
- return spec;
- }
-
- @Override
- public synchronized FaultState state() {
- return state;
- }
-
- @Override
- public synchronized void setState(FaultState state) {
- this.state = state;
- }
-
- @Override
- public final void activate(long now, Platform platform) throws Exception {
- try {
- handleActivation(now, platform);
- setState(new RunningState(now));
- } catch (Exception e) {
- setState(new DoneState(now, e.getMessage()));
- throw e;
- }
- }
-
- protected abstract void handleActivation(long now, Platform platform) throws Exception;
-
- @Override
- public final void deactivate(long now, Platform platform) throws Exception {
- try {
- handleDeactivation(now, platform);
- setState(new DoneState(now, ""));
- } catch (Exception e) {
- setState(new DoneState(now, e.getMessage()));
- throw e;
- }
- }
-
- protected abstract void handleDeactivation(long now, Platform platform) throws Exception;
-
- @Override
- public abstract Set<String> targetNodes(Topology topology);
-
- @Override
- public final boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- return toString().equals(o.toString());
- }
-
- @Override
- public final int hashCode() {
- return Objects.hashCode(toString());
- }
-
- @Override
- public final String toString() {
- return getClass().getSimpleName() + "(id=" + id +
- ", spec=" + JsonUtil.toJsonString(spec) +
- ", state=" + JsonUtil.toJsonString(state()) +
- ")";
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFaultSpec.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFaultSpec.java
deleted file mode 100644
index 5f551b5..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/AbstractFaultSpec.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.JsonUtil;
-
-
-/**
- * A base class that can be used for FaultSpecs.
- */
-public abstract class AbstractFaultSpec implements FaultSpec {
- private final long startMs;
- private final long durationMs;
-
- protected AbstractFaultSpec(@JsonProperty("startMs") long startMs,
- @JsonProperty("durationMs") long durationMs) {
- this.startMs = startMs;
- this.durationMs = durationMs;
- }
-
- @JsonProperty
- @Override
- public long startMs() {
- return startMs;
- }
-
- @JsonProperty
- @Override
- public long durationMs() {
- return durationMs;
- }
-
- @Override
- public String toString() {
- return JsonUtil.toJsonString(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/DoneState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/DoneState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/DoneState.java
deleted file mode 100644
index 222caf0..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/DoneState.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- * The state a fault is in on the agent or controller when it is completed,
- * either normally or with an error.
- */
-public class DoneState extends FaultState {
- private final long doneMs;
- private final String errorStr;
-
- @JsonCreator
- public DoneState(@JsonProperty("doneMs") long doneMs,
- @JsonProperty("errorStr") String errorStr) {
- this.doneMs = doneMs;
- this.errorStr = errorStr;
- }
-
- @JsonProperty
- public long doneMs() {
- return doneMs;
- }
-
- @JsonProperty
- public String errorStr() {
- return errorStr;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java
deleted file mode 100644
index e44d56a..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/Fault.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import org.apache.kafka.trogdor.common.Platform;
-import org.apache.kafka.trogdor.common.Topology;
-
-import java.util.Set;
-
-public interface Fault {
- /**
- * Get the ID of this fault.
- */
- String id();
-
- /**
- * Get the specification for this Fault.
- */
- FaultSpec spec();
-
- /**
- * Get the current fault state. Thread-safe.
- */
- FaultState state();
-
- /**
- * Set the current fault state. Thread-safe.
- */
- void setState(FaultState state);
-
- /**
- * Activate the fault. Will transition into RunningState or DoneState.
- *
- * @param now The current time in ms.
- * @param platform The platform to use.
- */
- void activate(long now, Platform platform) throws Exception;
-
- /**
- * Deactivate the fault. Will transition into DoneState.
- *
- * @param now The current time in ms.
- * @param platform The platform to use.
- */
- void deactivate(long now, Platform platform) throws Exception;
-
- /**
- * Get the nodes which this fault is targetting.
- *
- * @param topology The topology to use.
- *
- * @return A set of target node names.
- */
- Set<String> targetNodes(Topology topology);
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java
deleted file mode 100644
index 63e5ff4..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSet.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.TreeMap;
-
-public class FaultSet {
- private final static long NS_PER_MS = 1000000L;
-
- /**
- * Maps fault start times in nanoseconds to faults.
- */
- private final TreeMap<Long, Fault> byStart = new TreeMap<Long, Fault>();
-
- /**
- * Maps fault end times in nanoseconds to faults.
- */
- private final TreeMap<Long, Fault> byEnd = new TreeMap<Long, Fault>();
-
- /**
- * Return an iterator that iterates over the fault set in start time order.
- */
- public FaultSetIterator iterateByStart() {
- return new FaultSetIterator(byStart);
- }
-
- /**
- * Return an iterator that iterates over the fault set in end time order.
- */
- public FaultSetIterator iterateByEnd() {
- return new FaultSetIterator(byEnd);
- }
-
- /**
- * Add a new fault to the FaultSet.
- */
- public void add(Fault fault) {
- insertUnique(byStart, fault.spec().startMs() * NS_PER_MS, fault);
- long endMs = fault.spec().startMs() + fault.spec().durationMs();
- insertUnique(byEnd, endMs * NS_PER_MS, fault);
- }
-
- /**
- * Insert a new fault to a TreeMap.
- *
- * If there is already a fault with the given key, the fault will be stored
- * with the next available key.
- */
- private void insertUnique(TreeMap<Long, Fault> map, long key, Fault fault) {
- while (true) {
- Fault existing = map.get(key);
- if (existing == null) {
- map.put(key, fault);
- return;
- } else if (existing == fault) {
- return;
- } else {
- key++;
- }
- }
- }
-
- /**
- * Remove a fault from the TreeMap. The fault is removed by object equality.
- */
- public void remove(Fault fault) {
- removeUnique(byStart, fault.spec().startMs() * NS_PER_MS, fault);
- long endMs = fault.spec().startMs() + fault.spec().durationMs();
- removeUnique(byEnd, endMs * NS_PER_MS, fault);
- }
-
- /**
- * Helper function to remove a fault from a map. We will search every
- * element of the map equal to or higher than the given key.
- */
- private void removeUnique(TreeMap<Long, Fault> map, long key, Fault fault) {
- while (true) {
- Map.Entry<Long, Fault> existing = map.ceilingEntry(key);
- if (existing == null) {
- throw new NoSuchElementException("No such element as " + fault);
- } else if (existing.getValue() == fault) {
- map.remove(existing.getKey());
- return;
- } else {
- key = existing.getKey() + 1;
- }
- }
- }
-
- /**
- * An iterator over the FaultSet.
- */
- class FaultSetIterator implements Iterator<Fault> {
- private final TreeMap<Long, Fault> map;
- private Fault cur = null;
- private long prevKey = -1;
-
- FaultSetIterator(TreeMap<Long, Fault> map) {
- this.map = map;
- }
-
- @Override
- public boolean hasNext() {
- Map.Entry<Long, Fault> entry = map.higherEntry(prevKey);
- return entry != null;
- }
-
- @Override
- public Fault next() {
- Map.Entry<Long, Fault> entry = map.higherEntry(prevKey);
- if (entry == null) {
- throw new NoSuchElementException();
- }
- prevKey = entry.getKey();
- cur = entry.getValue();
- return cur;
- }
-
- @Override
- public void remove() {
- if (cur == null) {
- throw new IllegalStateException();
- }
- FaultSet.this.remove(cur);
- cur = null;
- }
- }
-};