You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2017/11/03 09:38:05 UTC
[2/4] kafka git commit: KAFKA-6060;
Add workload generation capabilities to Trogdor
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSpec.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSpec.java
deleted file mode 100644
index e15c4e9..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultSpec.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.kafka.common.utils.Utils;
-
-
-/**
- * The specification for a fault.
- */
-@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS,
- include = JsonTypeInfo.As.PROPERTY,
- property = "class")
-public interface FaultSpec {
- class Util {
- private static final String SPEC_STRING = "Spec";
-
- public static Fault createFault(String faultId, FaultSpec faultSpec) throws ClassNotFoundException {
- String faultSpecClassName = faultSpec.getClass().getName();
- if (!faultSpecClassName.endsWith(SPEC_STRING)) {
- throw new RuntimeException("FaultSpec class name must end with " + SPEC_STRING);
- }
- String faultClassName = faultSpecClassName.substring(0,
- faultSpecClassName.length() - SPEC_STRING.length());
- return Utils.newParameterizedInstance(faultClassName,
- String.class, faultId,
- FaultSpec.class, faultSpec);
- }
- }
-
- /**
- * Get the start time of this fault in ms.
- */
- @JsonProperty
- long startMs();
-
- /**
- * Get the duration of this fault in ms.
- */
- @JsonProperty
- long durationMs();
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java
deleted file mode 100644
index cba8419..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/FaultState.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import org.apache.kafka.trogdor.common.JsonUtil;
-import java.util.Objects;
-
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
- include = JsonTypeInfo.As.PROPERTY,
- property = "stateName")
-@JsonSubTypes({
- @JsonSubTypes.Type(value = DoneState.class, name = "done"),
- @JsonSubTypes.Type(value = PendingState.class, name = "pending"),
- @JsonSubTypes.Type(value = RunningState.class, name = "running"),
- @JsonSubTypes.Type(value = SendingState.class, name = "sending")
- })
-public abstract class FaultState {
- @Override
- public final boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- return toString().equals(o.toString());
- }
-
- @Override
- public final int hashCode() {
- return Objects.hashCode(toString());
- }
-
- @Override
- public final String toString() {
- return JsonUtil.toJsonString(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java
deleted file mode 100644
index cf3270a..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFault.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import org.apache.kafka.trogdor.common.Node;
-import org.apache.kafka.trogdor.common.Platform;
-import org.apache.kafka.trogdor.common.Topology;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-
-public class NetworkPartitionFault extends AbstractFault {
- private static final Logger log = LoggerFactory.getLogger(NetworkPartitionFault.class);
-
- private final List<Set<String>> partitions;
-
- public NetworkPartitionFault(String id, FaultSpec spec) {
- super(id, spec);
- NetworkPartitionFaultSpec faultSpec = (NetworkPartitionFaultSpec) spec;
- this.partitions = new ArrayList<>();
- HashSet<String> prevNodes = new HashSet<>();
- for (List<String> partition : faultSpec.partitions()) {
- for (String nodeName : partition) {
- if (prevNodes.contains(nodeName)) {
- throw new RuntimeException("Node " + nodeName +
- " appears in more than one partition.");
- }
- prevNodes.add(nodeName);
- this.partitions.add(new HashSet<String>(partition));
- }
- }
- }
-
- @Override
- protected void handleActivation(long now, Platform platform) throws Exception {
- log.info("Activating NetworkPartitionFault...");
- runIptablesCommands(platform, "-A");
- }
-
- @Override
- protected void handleDeactivation(long now, Platform platform) throws Exception {
- log.info("Deactivating NetworkPartitionFault...");
- runIptablesCommands(platform, "-D");
- }
-
- private void runIptablesCommands(Platform platform, String iptablesAction) throws Exception {
- Node curNode = platform.curNode();
- Topology topology = platform.topology();
- TreeSet<String> toBlock = new TreeSet<>();
- for (Set<String> partition : partitions) {
- if (!partition.contains(curNode.name())) {
- for (String nodeName : partition) {
- toBlock.add(nodeName);
- }
- }
- }
- for (String nodeName : toBlock) {
- Node node = topology.node(nodeName);
- InetAddress addr = InetAddress.getByName(node.hostname());
- platform.runCommand(new String[] {
- "sudo", "iptables", iptablesAction, "INPUT", "-p", "tcp", "-s",
- addr.getHostAddress(), "-j", "DROP", "-m", "comment", "--comment", nodeName
- });
- }
- }
-
- @Override
- public Set<String> targetNodes(Topology topology) {
- Set<String> targetNodes = new HashSet<>();
- for (Set<String> partition : partitions) {
- targetNodes.addAll(partition);
- }
- return targetNodes;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultController.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultController.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultController.java
new file mode 100644
index 0000000..d90534f
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultController.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import org.apache.kafka.trogdor.common.Topology;
+import org.apache.kafka.trogdor.task.TaskController;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class NetworkPartitionFaultController implements TaskController {
+ private final List<Set<String>> partitionSets;
+
+ public NetworkPartitionFaultController(List<Set<String>> partitionSets) {
+ this.partitionSets = partitionSets;
+ }
+
+ @Override
+ public Set<String> targetNodes(Topology topology) {
+ Set<String> targetNodes = new HashSet<>();
+ for (Set<String> partitionSet : partitionSets) {
+ targetNodes.addAll(partitionSet);
+ }
+ return targetNodes;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java
index d734dce..7b9ccc4 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultSpec.java
@@ -19,15 +19,19 @@ package org.apache.kafka.trogdor.fault;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.task.TaskController;
+import org.apache.kafka.trogdor.task.TaskSpec;
+import org.apache.kafka.trogdor.task.TaskWorker;
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
-import java.util.Objects;
+import java.util.Set;
/**
* The specification for a fault that creates a network partition.
*/
-public class NetworkPartitionFaultSpec extends AbstractFaultSpec {
+public class NetworkPartitionFaultSpec extends TaskSpec {
private final List<List<String>> partitions;
@JsonCreator
@@ -44,22 +48,28 @@ public class NetworkPartitionFaultSpec extends AbstractFaultSpec {
}
@Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- NetworkPartitionFaultSpec that = (NetworkPartitionFaultSpec) o;
- return Objects.equals(startMs(), that.startMs()) &&
- Objects.equals(durationMs(), that.durationMs()) &&
- Objects.equals(partitions, that.partitions);
+ public TaskController newController(String id) {
+ return new NetworkPartitionFaultController(partitionSets());
}
@Override
- public int hashCode() {
- return Objects.hash(startMs(), durationMs(), partitions);
+ public TaskWorker newTaskWorker(String id) {
+ return new NetworkPartitionFaultWorker(id, partitionSets());
}
- @Override
- public String toString() {
- return JsonUtil.toJsonString(this);
+ private List<Set<String>> partitionSets() {
+ List<Set<String>> partitionSets = new ArrayList<>();
+ HashSet<String> prevNodes = new HashSet<>();
+ for (List<String> partition : this.partitions()) {
+ for (String nodeName : partition) {
+ if (prevNodes.contains(nodeName)) {
+ throw new RuntimeException("Node " + nodeName +
+ " appears in more than one partition.");
+ }
+ prevNodes.add(nodeName);
+ partitionSets.add(new HashSet<>(partition));
+ }
+ }
+ return partitionSets;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java
new file mode 100644
index 0000000..787c5e0
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/fault/NetworkPartitionFaultWorker.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.fault;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.trogdor.common.Node;
+import org.apache.kafka.trogdor.common.Platform;
+import org.apache.kafka.trogdor.common.Topology;
+import org.apache.kafka.trogdor.task.TaskWorker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class NetworkPartitionFaultWorker implements TaskWorker {
+ private static final Logger log = LoggerFactory.getLogger(NetworkPartitionFaultWorker.class);
+
+ private final String id;
+
+ private final List<Set<String>> partitionSets;
+
+ public NetworkPartitionFaultWorker(String id, List<Set<String>> partitionSets) {
+ this.id = id;
+ this.partitionSets = partitionSets;
+ }
+
+ @Override
+ public void start(Platform platform, AtomicReference<String> status,
+ KafkaFutureImpl<String> errorFuture) throws Exception {
+ log.info("Activating NetworkPartitionFault {}.", id);
+ runIptablesCommands(platform, "-A");
+ }
+
+ @Override
+ public void stop(Platform platform) throws Exception {
+ log.info("Deactivating NetworkPartitionFault {}.", id);
+ runIptablesCommands(platform, "-D");
+ }
+
+ private void runIptablesCommands(Platform platform, String iptablesAction) throws Exception {
+ Node curNode = platform.curNode();
+ Topology topology = platform.topology();
+ TreeSet<String> toBlock = new TreeSet<>();
+ for (Set<String> partitionSet : partitionSets) {
+ if (!partitionSet.contains(curNode.name())) {
+ for (String nodeName : partitionSet) {
+ toBlock.add(nodeName);
+ }
+ }
+ }
+ for (String nodeName : toBlock) {
+ Node node = topology.node(nodeName);
+ InetAddress addr = InetAddress.getByName(node.hostname());
+ platform.runCommand(new String[] {
+ "sudo", "iptables", iptablesAction, "INPUT", "-p", "tcp", "-s",
+ addr.getHostAddress(), "-j", "DROP", "-m", "comment", "--comment", nodeName
+ });
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java
deleted file mode 100644
index 70b4965..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFault.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import org.apache.kafka.trogdor.common.Node;
-import org.apache.kafka.trogdor.common.Platform;
-import org.apache.kafka.trogdor.common.Topology;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-public class NoOpFault extends AbstractFault {
- private static final Logger log = LoggerFactory.getLogger(NoOpFault.class);
-
- public NoOpFault(String id, FaultSpec spec) {
- super(id, spec);
- }
-
- @Override
- protected void handleActivation(long now, Platform platform) throws Exception {
- log.info("Activating NoOpFault...");
- }
-
- @Override
- protected void handleDeactivation(long now, Platform platform) throws Exception {
- log.info("Deactivating NoOpFault...");
- }
-
- @Override
- public Set<String> targetNodes(Topology topology) {
- Set<String> set = new HashSet<>();
- for (Map.Entry<String, Node> entry : topology.nodes().entrySet()) {
- if (Node.Util.getTrogdorAgentPort(entry.getValue()) > 0) {
- set.add(entry.getKey());
- }
- }
- return set;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFaultSpec.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFaultSpec.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFaultSpec.java
deleted file mode 100644
index 1d4b94d..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/NoOpFaultSpec.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.Objects;
-
-/**
- * The specification for a fault that does nothing.
- *
- * This fault type exists mainly to test the fault injection system.
- */
-public class NoOpFaultSpec extends AbstractFaultSpec {
- @JsonCreator
- public NoOpFaultSpec(@JsonProperty("startMs") long startMs,
- @JsonProperty("durationMs") long durationMs) {
- super(startMs, durationMs);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- NoOpFaultSpec that = (NoOpFaultSpec) o;
- return Objects.equals(startMs(), that.startMs()) &&
- Objects.equals(durationMs(), that.durationMs());
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(startMs(), durationMs());
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java
deleted file mode 100644
index 57c8e88..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/PendingState.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-/**
- * The state a fault is in on the agent or controller when we haven't yet done
- * anything with it.
- */
-public class PendingState extends FaultState {
- @JsonCreator
- public PendingState() {
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java
deleted file mode 100644
index 1b81bf5..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/RunningState.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- * The state a fault is in on the agent when it is running.
- */
-public class RunningState extends FaultState {
- private final long startedMs;
-
- @JsonCreator
- public RunningState(@JsonProperty("startedMs") long startedMs) {
- this.startedMs = startedMs;
- }
-
- @JsonProperty
- public long startedMs() {
- return startedMs;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java b/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java
deleted file mode 100644
index edfbed2..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/fault/SendingState.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.fault;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.common.utils.Utils;
-
-import java.util.TreeMap;
-import java.util.Set;
-
-/**
- * The state a fault is in on the controller when it is scheduled to be sent to several agents.
- */
-public class SendingState extends FaultState {
- private final TreeMap<String, Boolean> nodes;
- private int remainingNodes;
-
- public SendingState(@JsonProperty("nodeNames") Set<String> nodeNames) {
- this.nodes = new TreeMap<>();
- for (String nodeName : nodeNames) {
- nodes.put(nodeName, false);
- }
- remainingNodes = nodeNames.size();
- }
-
- @JsonProperty
- public synchronized Set<String> nodeNames() {
- return nodes.keySet();
- }
-
- /**
- * Complete a send operation.
- *
- * @param nodeName The name of the node we sent to.
- * @return True if there are no more send operations left.
- */
- public synchronized boolean completeSend(String nodeName) {
- if (!nodes.containsKey(nodeName)) {
- throw new RuntimeException("Node " + nodeName + " was not to supposed to " +
- "receive this fault. The fault was scheduled on nodes: " +
- Utils.join(nodes.keySet(), ", "));
- }
- if (nodes.put(nodeName, true)) {
- throw new RuntimeException("Node " + nodeName + " already received this fault.");
- }
- remainingNodes--;
- return remainingNodes == 0;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentFaultsResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentFaultsResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentFaultsResponse.java
deleted file mode 100644
index a1b5246..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentFaultsResponse.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.rest;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.JsonUtil;
-
-import java.util.Map;
-
-/**
- * Response to GET /faults
- */
-public class AgentFaultsResponse extends FaultDataMap {
- @JsonCreator
- public AgentFaultsResponse(@JsonProperty("faults") Map<String, FaultData> faults) {
- super(faults);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- AgentFaultsResponse that = (AgentFaultsResponse) o;
- return super.equals(that);
- }
-
- @Override
- public int hashCode() {
- return super.hashCode();
- }
-
- @Override
- public String toString() {
- return JsonUtil.toJsonString(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
index 8e32f87..77b4bfb 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/AgentStatusResponse.java
@@ -19,41 +19,30 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.JsonUtil;
-import java.util.Objects;
+import java.util.TreeMap;
/**
- * The status of the Trogdor agent.
+ * A response from the Trogdor agent about the worker states and specifications.
*/
-public class AgentStatusResponse {
- private final long startTimeMs;
+public class AgentStatusResponse extends Message {
+ private final long serverStartMs;
+ private final TreeMap<String, WorkerState> workers;
@JsonCreator
- public AgentStatusResponse(@JsonProperty("startTimeMs") long startTimeMs) {
- this.startTimeMs = startTimeMs;
+ public AgentStatusResponse(@JsonProperty("serverStartMs") long serverStartMs,
+ @JsonProperty("workers") TreeMap<String, WorkerState> workers) {
+ this.serverStartMs = serverStartMs;
+ this.workers = workers;
}
@JsonProperty
- public long startTimeMs() {
- return startTimeMs;
+ public long serverStartMs() {
+ return serverStartMs;
}
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- AgentStatusResponse that = (AgentStatusResponse) o;
- return Objects.equals(startTimeMs, that.startTimeMs);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(startTimeMs);
- }
-
- @Override
- public String toString() {
- return JsonUtil.toJsonString(this);
+ @JsonProperty
+ public TreeMap<String, WorkerState> workers() {
+ return workers;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorFaultsResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorFaultsResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorFaultsResponse.java
deleted file mode 100644
index df26274..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorFaultsResponse.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.rest;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.JsonUtil;
-
-import java.util.Map;
-
-/**
- * Response to GET /faults
- */
-public class CoordinatorFaultsResponse extends FaultDataMap {
- @JsonCreator
- public CoordinatorFaultsResponse(@JsonProperty("faults") Map<String, FaultData> faults) {
- super(faults);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- CoordinatorFaultsResponse that = (CoordinatorFaultsResponse) o;
- return super.equals(that);
- }
-
- @Override
- public int hashCode() {
- return super.hashCode();
- }
-
- @Override
- public String toString() {
- return JsonUtil.toJsonString(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorShutdownRequest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorShutdownRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorShutdownRequest.java
new file mode 100644
index 0000000..1aacaaf
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorShutdownRequest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A request to the Trogdor coordinator to shut down.
+ */
+public class CoordinatorShutdownRequest extends Message {
+ private final boolean stopAgents;
+
+ @JsonCreator
+ public CoordinatorShutdownRequest(@JsonProperty("stopAgents") boolean stopAgents) {
+ this.stopAgents = stopAgents;
+ }
+
+ @JsonProperty
+ public boolean stopAgents() {
+ return stopAgents;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java
index 348e310..8840d29 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CoordinatorStatusResponse.java
@@ -19,41 +19,20 @@ package org.apache.kafka.trogdor.rest;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.JsonUtil;
-
-import java.util.Objects;
/**
- * The status of the Trogdor coordinator.
+ * A status response from the Trogdor coordinator.
*/
-public class CoordinatorStatusResponse {
- private final long startTimeMs;
+public class CoordinatorStatusResponse extends Message {
+ private final long serverStartMs;
@JsonCreator
- public CoordinatorStatusResponse(@JsonProperty("startTimeMs") long startTimeMs) {
- this.startTimeMs = startTimeMs;
+ public CoordinatorStatusResponse(@JsonProperty("serverStartMs") long serverStartMs) {
+ this.serverStartMs = serverStartMs;
}
@JsonProperty
- public long startTimeMs() {
- return startTimeMs;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- CoordinatorStatusResponse that = (CoordinatorStatusResponse) o;
- return Objects.equals(startTimeMs, that.startTimeMs);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(startTimeMs);
- }
-
- @Override
- public String toString() {
- return JsonUtil.toJsonString(this);
+ public long serverStartMs() {
+ return serverStartMs;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateAgentFaultRequest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateAgentFaultRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateAgentFaultRequest.java
deleted file mode 100644
index 6e772d9..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateAgentFaultRequest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.rest;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.JsonUtil;
-import org.apache.kafka.trogdor.fault.FaultSpec;
-
-import java.util.Objects;
-
-/**
- * A request to the Trogdor agent to create a fault.
- */
-public class CreateAgentFaultRequest {
- private final String id;
- private final FaultSpec spec;
-
- @JsonCreator
- public CreateAgentFaultRequest(@JsonProperty("id") String id,
- @JsonProperty("spec") FaultSpec spec) {
- this.id = id;
- this.spec = spec;
- }
-
- @JsonProperty
- public String id() {
- return id;
- }
-
- @JsonProperty
- public FaultSpec spec() {
- return spec;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- CreateAgentFaultRequest that = (CreateAgentFaultRequest) o;
- return Objects.equals(id, that.id) &&
- Objects.equals(spec, that.spec);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(id, spec);
- }
-
- @Override
- public String toString() {
- return JsonUtil.toJsonString(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateCoordinatorFaultRequest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateCoordinatorFaultRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateCoordinatorFaultRequest.java
deleted file mode 100644
index ec00cf3..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateCoordinatorFaultRequest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.rest;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.JsonUtil;
-import org.apache.kafka.trogdor.fault.FaultSpec;
-
-import java.util.Objects;
-
-/**
- * A request to the Trogdor coordinator to create a fault.
- */
-public class CreateCoordinatorFaultRequest {
- private final String id;
- private final FaultSpec spec;
-
- @JsonCreator
- public CreateCoordinatorFaultRequest(@JsonProperty("id") String id,
- @JsonProperty("spec") FaultSpec spec) {
- this.id = id;
- this.spec = spec;
- }
-
- @JsonProperty
- public String id() {
- return id;
- }
-
- @JsonProperty
- public FaultSpec spec() {
- return spec;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- CreateCoordinatorFaultRequest that = (CreateCoordinatorFaultRequest) o;
- return Objects.equals(id, that.id) &&
- Objects.equals(spec, that.spec);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(id, spec);
- }
-
- @Override
- public String toString() {
- return JsonUtil.toJsonString(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskRequest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskRequest.java
new file mode 100644
index 0000000..d463e36
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskRequest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * A request to the Trogdor coorinator to create a task.
+ */
+public class CreateTaskRequest extends Message {
+ private final String id;
+ private final TaskSpec spec;
+
+ @JsonCreator
+ public CreateTaskRequest(@JsonProperty("id") String id,
+ @JsonProperty("spec") TaskSpec spec) {
+ this.id = id;
+ this.spec = spec;
+ }
+
+ @JsonProperty
+ public String id() {
+ return id;
+ }
+
+ @JsonProperty
+ public TaskSpec spec() {
+ return spec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskResponse.java
new file mode 100644
index 0000000..54ea0f2
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateTaskResponse.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * A response from the Trogdor coordinator about creating a task.
+ */
+public class CreateTaskResponse extends Message {
+ private final TaskSpec spec;
+
+ @JsonCreator
+ public CreateTaskResponse(@JsonProperty("spec") TaskSpec spec) {
+ this.spec = spec;
+ }
+
+ @JsonProperty
+ public TaskSpec spec() {
+ return spec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java
new file mode 100644
index 0000000..9f6e8dc
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerRequest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * A request to the Trogdor agent to create a worker.
+ */
+public class CreateWorkerRequest extends Message {
+ private final String id;
+ private final TaskSpec spec;
+
+ @JsonCreator
+ public CreateWorkerRequest(@JsonProperty("id") String id,
+ @JsonProperty("spec") TaskSpec spec) {
+ this.id = id;
+ this.spec = spec;
+ }
+
+ @JsonProperty
+ public String id() {
+ return id;
+ }
+
+ @JsonProperty
+ public TaskSpec spec() {
+ return spec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerResponse.java
new file mode 100644
index 0000000..9e068ec
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/CreateWorkerResponse.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * A response from the Trogdor agent about creating a worker.
+ */
+public class CreateWorkerResponse extends Message {
+ private final TaskSpec spec;
+
+ @JsonCreator
+ public CreateWorkerResponse(@JsonProperty("spec") TaskSpec spec) {
+ this.spec = spec;
+ }
+
+ @JsonProperty
+ public TaskSpec spec() {
+ return spec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java
deleted file mode 100644
index b2f7c91..0000000
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/FaultDataMap.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.trogdor.rest;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.kafka.trogdor.common.JsonUtil;
-import org.apache.kafka.trogdor.fault.FaultSpec;
-import org.apache.kafka.trogdor.fault.FaultState;
-
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * Response to GET /faults
- */
-public class FaultDataMap {
- private final Map<String, FaultData> faults;
-
- public static class FaultData {
- private final FaultSpec spec;
- private final FaultState state;
-
- @JsonCreator
- public FaultData(@JsonProperty("spec") FaultSpec spec,
- @JsonProperty("state") FaultState state) {
- this.spec = spec;
- this.state = state;
- }
-
- @JsonProperty
- public FaultSpec spec() {
- return spec;
- }
-
- @JsonProperty
- public FaultState state() {
- return state;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- FaultData that = (FaultData) o;
- return Objects.equals(spec, that.spec) &&
- Objects.equals(state, that.state);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(spec, state);
- }
- }
-
- @JsonCreator
- public FaultDataMap(@JsonProperty("faults") Map<String, FaultData> faults) {
- this.faults = faults;
- }
-
- @JsonProperty
- public Map<String, FaultData> faults() {
- return faults;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
- FaultDataMap that = (FaultDataMap) o;
- return Objects.equals(faults, that.faults);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(faults);
- }
-
- @Override
- public String toString() {
- return JsonUtil.toJsonString(this);
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
index 1b23a9e..e61b7fe 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
import org.apache.kafka.trogdor.common.JsonUtil;
+import org.apache.kafka.trogdor.common.ThreadUtils;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
@@ -43,6 +44,10 @@ import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* Embedded server for the REST API that provides the control plane for Trogdor.
@@ -50,7 +55,9 @@ import java.nio.charset.StandardCharsets;
public class JsonRestServer {
private static final Logger log = LoggerFactory.getLogger(JsonRestServer.class);
- private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 2 * 1000;
+ private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 100;
+
+ private final ScheduledExecutorService shutdownExecutor;
private final Server jettyServer;
@@ -63,6 +70,8 @@ public class JsonRestServer {
* 0 to use a random port.
*/
public JsonRestServer(int port) {
+ this.shutdownExecutor = Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.createThreadFactory("JsonRestServerCleanupExecutor", false));
this.jettyServer = new Server();
this.connector = new ServerConnector(jettyServer);
if (port > 0) {
@@ -78,7 +87,6 @@ public class JsonRestServer {
*/
public void start(Object... resources) {
log.info("Starting REST server");
-
ResourceConfig resourceConfig = new ResourceConfig();
resourceConfig.register(new JacksonJsonProvider(JsonUtil.JSON_SERDE));
for (Object resource : resources) {
@@ -119,17 +127,37 @@ public class JsonRestServer {
return connector.getLocalPort();
}
- public void stop() {
- log.info("Stopping REST server");
+ /**
+ * Initiate shutdown, but do not wait for it to complete.
+ */
+ public void beginShutdown() {
+ if (!shutdownExecutor.isShutdown()) {
+ shutdownExecutor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ try {
+ log.info("Stopping REST server");
+ jettyServer.stop();
+ jettyServer.join();
+ log.info("REST server stopped");
+ } catch (Exception e) {
+ log.error("Unable to stop REST server", e);
+ } finally {
+ jettyServer.destroy();
+ }
+ shutdownExecutor.shutdown();
+ return null;
+ }
+ });
+ }
+ }
- try {
- jettyServer.stop();
- jettyServer.join();
- log.info("REST server stopped");
- } catch (Exception e) {
- log.error("Unable to stop REST server", e);
- } finally {
- jettyServer.destroy();
+ /**
+ * Wait for shutdown to complete. May be called prior to beginShutdown.
+ */
+ public void waitForShutdown() throws InterruptedException {
+ while (!shutdownExecutor.isShutdown()) {
+ shutdownExecutor.awaitTermination(1, TimeUnit.DAYS);
}
}
@@ -197,6 +225,24 @@ public class JsonRestServer {
}
}
+ public static <T> HttpResponse<T> httpRequest(String url, String method,
+ Object requestBodyData, TypeReference<T> responseFormat, int maxTries)
+ throws IOException, InterruptedException {
+ IOException exc = null;
+ for (int tries = 0; tries < maxTries; tries++) {
+ if (tries > 0) {
+ Thread.sleep(tries > 1 ? 10 : 2);
+ }
+ try {
+ return httpRequest(url, method, requestBodyData, responseFormat);
+ } catch (IOException e) {
+ log.info("{} {}: error: {}", method, url, e.getMessage());
+ exc = e;
+ }
+ }
+ throw exc;
+ }
+
public static class HttpResponse<T> {
private final T body;
private final ErrorResponse error;
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/Message.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/Message.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/Message.java
new file mode 100644
index 0000000..c2ee840
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/Message.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import org.apache.kafka.trogdor.common.JsonUtil;
+
+import java.util.Objects;
+
+public abstract class Message {
+ @Override
+ public final boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ return Objects.equals(toString(), o.toString());
+ }
+
+ @Override
+ public final int hashCode() {
+ return toString().hashCode();
+ }
+
+ @Override
+ public final String toString() {
+ return JsonUtil.toJsonString(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java
new file mode 100644
index 0000000..3287801
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskRequest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A request to the Trogdor agent to stop a task.
+ */
+public class StopTaskRequest extends Message {
+ private final String id;
+
+ @JsonCreator
+ public StopTaskRequest(@JsonProperty("id") String id) {
+ this.id = id;
+ }
+
+ @JsonProperty
+ public String id() {
+ return id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskResponse.java
new file mode 100644
index 0000000..f344dc9
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopTaskResponse.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * A response from the Trogdor coordinator about stopping a task.
+ */
+public class StopTaskResponse extends Message {
+ private final TaskSpec spec;
+
+ @JsonCreator
+ public StopTaskResponse(@JsonProperty("spec") TaskSpec spec) {
+ this.spec = spec;
+ }
+
+ @JsonProperty
+ public TaskSpec spec() {
+ return spec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java
new file mode 100644
index 0000000..54c689a
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerRequest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A request to the Trogdor agent to stop a worker.
+ */
+public class StopWorkerRequest extends Message {
+ private final String id;
+
+ @JsonCreator
+ public StopWorkerRequest(@JsonProperty("id") String id) {
+ this.id = id;
+ }
+
+ @JsonProperty
+ public String id() {
+ return id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerResponse.java
new file mode 100644
index 0000000..7d5b468
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/StopWorkerResponse.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * A response from the Trogdor agent about stopping a worker.
+ */
+public class StopWorkerResponse extends Message {
+ private final TaskSpec spec;
+
+ @JsonCreator
+ public StopWorkerResponse(@JsonProperty("spec") TaskSpec spec) {
+ this.spec = spec;
+ }
+
+ @JsonProperty
+ public TaskSpec spec() {
+ return spec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java
new file mode 100644
index 0000000..536d3f2
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskDone.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * The state a task is in once it's done.
+ */
+public class TaskDone extends TaskState {
+ /**
+ * The time on the coordinator when the task was started.
+ */
+ private final long startedMs;
+
+ /**
+ * The time on the coordinator when the task was completed.
+ */
+ private final long doneMs;
+
+ /**
+ * Empty if the task completed without error; the error message otherwise.
+ */
+ private final String error;
+
+ /**
+ * True if the task was manually cancelled, rather than terminating itself.
+ */
+ private final boolean cancelled;
+
+ @JsonCreator
+ public TaskDone(@JsonProperty("spec") TaskSpec spec,
+ @JsonProperty("startedMs") long startedMs,
+ @JsonProperty("doneMs") long doneMs,
+ @JsonProperty("error") String error,
+ @JsonProperty("cancelled") boolean cancelled) {
+ super(spec);
+ this.startedMs = startedMs;
+ this.doneMs = doneMs;
+ this.error = error;
+ this.cancelled = cancelled;
+ }
+
+ @JsonProperty
+ public long startedMs() {
+ return startedMs;
+ }
+
+ @JsonProperty
+ public long doneMs() {
+ return doneMs;
+ }
+
+ @JsonProperty
+ public String error() {
+ return error;
+ }
+
+ @JsonProperty
+ public boolean cancelled() {
+ return cancelled;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
new file mode 100644
index 0000000..b0162d3
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskPending.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * The state for a task which is still pending.
+ */
+public class TaskPending extends TaskState {
+ @JsonCreator
+ public TaskPending(@JsonProperty("spec") TaskSpec spec) {
+ super(spec);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java
new file mode 100644
index 0000000..bff3676
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRunning.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * The state for a task which is being run by the agent.
+ */
+public class TaskRunning extends TaskState {
+ /**
+ * The time on the agent when the task was started.
+ */
+ private final long startedMs;
+
+ @JsonCreator
+ public TaskRunning(@JsonProperty("spec") TaskSpec spec,
+ @JsonProperty("startedMs") long startedMs) {
+ super(spec);
+ this.startedMs = startedMs;
+ }
+
+ @JsonProperty
+ public long startedMs() {
+ return startedMs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
new file mode 100644
index 0000000..28b6108
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * The state which a task is in on the Coordinator.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME,
+ include = JsonTypeInfo.As.PROPERTY,
+ property = "state")
+@JsonSubTypes({
+ @JsonSubTypes.Type(value = TaskPending.class, name = "PENDING"),
+ @JsonSubTypes.Type(value = TaskRunning.class, name = "RUNNING"),
+ @JsonSubTypes.Type(value = TaskStopping.class, name = "STOPPING"),
+ @JsonSubTypes.Type(value = TaskDone.class, name = "DONE")
+ })
+public abstract class TaskState extends Message {
+ private final TaskSpec spec;
+
+ public TaskState(TaskSpec spec) {
+ this.spec = spec;
+ }
+
+ @JsonProperty
+ public TaskSpec spec() {
+ return spec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java
new file mode 100644
index 0000000..4446b75
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStopping.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * The state for a task which is being stopped on the coordinator.
+ */
+public class TaskStopping extends TaskState {
+ /**
+ * The time on the agent when the task was received.
+ */
+ private final long startedMs;
+
+ @JsonCreator
+ public TaskStopping(@JsonProperty("spec") TaskSpec spec,
+ @JsonProperty("startedMs") long startedMs) {
+ super(spec);
+ this.startedMs = startedMs;
+ }
+
+ @JsonProperty
+ public long startedMs() {
+ return startedMs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksResponse.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksResponse.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksResponse.java
new file mode 100644
index 0000000..d3b415b
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksResponse.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.TreeMap;
+
+/**
+ * The response to /coordinator/tasks
+ */
+public class TasksResponse extends Message {
+ private final TreeMap<String, TaskState> tasks;
+
+ @JsonCreator
+ public TasksResponse(@JsonProperty("tasks") TreeMap<String, TaskState> tasks) {
+ this.tasks = tasks;
+ }
+
+ @JsonProperty
+ public TreeMap<String, TaskState> tasks() {
+ return tasks;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
new file mode 100644
index 0000000..0f46b25
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerDone.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * The state a worker is in once it's done.
+ */
+public class WorkerDone extends WorkerState {
+ /**
+ * The time on the agent when the task was started.
+ */
+ private final long startedMs;
+
+ /**
+ * The time on the agent when the task was completed.
+ */
+ private final long doneMs;
+
+ /**
+ * The task status. The format will depend on the type of task that is
+ * being run.
+ */
+ private final String status;
+
+ /**
+ * Empty if the task completed without error; the error message otherwise.
+ */
+ private final String error;
+
+ @JsonCreator
+ public WorkerDone(@JsonProperty("spec") TaskSpec spec,
+ @JsonProperty("startedMs") long startedMs,
+ @JsonProperty("doneMs") long doneMs,
+ @JsonProperty("status") String status,
+ @JsonProperty("error") String error) {
+ super(spec);
+ this.startedMs = startedMs;
+ this.doneMs = doneMs;
+ this.status = status;
+ this.error = error;
+ }
+
+ @JsonProperty
+ @Override
+ public long startedMs() {
+ return startedMs;
+ }
+
+ @JsonProperty
+ public long doneMs() {
+ return doneMs;
+ }
+
+ @JsonProperty
+ @Override
+ public String status() {
+ return status;
+ }
+
+ @JsonProperty
+ public String error() {
+ return error;
+ }
+
+ @Override
+ public boolean done() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4fac83ba/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
----------------------------------------------------------------------
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
new file mode 100644
index 0000000..d3e3565
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/WorkerReceiving.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.trogdor.task.TaskSpec;
+
+/**
+ * When we're in the process of sending a TaskSpec to the Agent, the Worker is regarded
+ * as being in WorkerReceiving state.
+ */
+public final class WorkerReceiving extends WorkerState {
+ @JsonCreator
+ public WorkerReceiving(@JsonProperty("spec") TaskSpec spec) {
+ super(spec);
+ }
+}