You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2018/11/09 18:31:24 UTC
[kafka] branch trunk updated: KAFKA-7564: Expose single task
details in Trogdor (#5852)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ecb71cf KAFKA-7564: Expose single task details in Trogdor (#5852)
ecb71cf is described below
commit ecb71cf4719e6d22d6738f8df2fd9e16dad33295
Author: Stanislav Kozlovski <st...@outlook.com>
AuthorDate: Fri Nov 9 18:31:04 2018 +0000
KAFKA-7564: Expose single task details in Trogdor (#5852)
This commit adds a new "/coordinator/tasks/{taskId}" endpoint which fetches details for a single task.
---
.../kafka/trogdor/coordinator/Coordinator.java | 6 ++++
.../trogdor/coordinator/CoordinatorClient.java | 25 ++++++++++++++
.../coordinator/CoordinatorRestResource.java | 14 ++++++++
.../kafka/trogdor/coordinator/TaskManager.java | 31 ++++++++++++++++++
.../org/apache/kafka/trogdor/rest/TaskRequest.java | 38 ++++++++++++++++++++++
.../apache/kafka/trogdor/common/ExpectedTasks.java | 2 +-
.../kafka/trogdor/coordinator/CoordinatorTest.java | 37 +++++++++++++++++++++
7 files changed, 152 insertions(+), 1 deletion(-)
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
index c3271c9..cd3da90 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
@@ -30,7 +30,9 @@ import org.apache.kafka.trogdor.rest.CreateTaskRequest;
import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.StopTaskRequest;
+import org.apache.kafka.trogdor.rest.TaskRequest;
import org.apache.kafka.trogdor.rest.TasksRequest;
+import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TasksResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -104,6 +106,10 @@ public final class Coordinator {
return taskManager.tasks(request);
}
+ public TaskState task(TaskRequest request) throws Exception {
+ return taskManager.task(request);
+ }
+
public void beginShutdown(boolean stopAgents) throws Exception {
restServer.beginShutdown();
taskManager.beginShutdown(stopAgents);
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
index 780ae73..80937a8 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
@@ -32,11 +32,14 @@ import org.apache.kafka.trogdor.rest.Empty;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
import org.apache.kafka.trogdor.rest.StopTaskRequest;
+import org.apache.kafka.trogdor.rest.TaskRequest;
import org.apache.kafka.trogdor.rest.TasksRequest;
+import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TasksResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.ws.rs.NotFoundException;
import javax.ws.rs.core.UriBuilder;
import static net.sourceforge.argparse4j.impl.Arguments.store;
@@ -151,6 +154,13 @@ public class CoordinatorClient {
return resp.body();
}
+ public TaskState task(TaskRequest request) throws Exception {
+ String uri = UriBuilder.fromPath(url("/coordinator/tasks/{taskId}")).build(request.taskId()).toString();
+ HttpResponse<TaskState> resp = JsonRestServer.httpRequest(log, uri, "GET",
+ null, new TypeReference<TaskState>() { }, maxTries);
+ return resp.body();
+ }
+
public void shutdown() throws Exception {
HttpResponse<Empty> resp =
JsonRestServer.httpRequest(log, url("/coordinator/shutdown"), "PUT",
@@ -181,6 +191,12 @@ public class CoordinatorClient {
.type(Boolean.class)
.dest("show_tasks")
.help("Show coordinator tasks.");
+ actions.addArgument("--show-task")
+ .action(store())
+ .type(String.class)
+ .dest("show_task")
+ .metavar("TASK_ID")
+ .help("Show a specific coordinator task.");
actions.addArgument("--create-task")
.action(store())
.type(String.class)
@@ -229,6 +245,15 @@ public class CoordinatorClient {
System.out.println("Got coordinator tasks: " +
JsonUtil.toPrettyJsonString(client.tasks(
new TasksRequest(null, 0, 0, 0, 0))));
+ } else if (res.getString("show_task") != null) {
+ String taskId = res.getString("show_task");
+ TaskRequest req = new TaskRequest(res.getString("show_task"));
+ try {
+ String taskOutput = String.format("Got coordinator task \"%s\": %s", taskId, JsonUtil.toPrettyJsonString(client.task(req)));
+ System.out.println(taskOutput);
+ } catch (NotFoundException e) {
+ System.out.println(e.getMessage());
+ }
} else if (res.getString("create_task") != null) {
CreateTaskRequest req = JsonUtil.JSON_SERDE.
readValue(res.getString("create_task"), CreateTaskRequest.class);
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
index cbfbddd..9163720 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
@@ -22,7 +22,9 @@ import org.apache.kafka.trogdor.rest.CreateTaskRequest;
import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
import org.apache.kafka.trogdor.rest.Empty;
import org.apache.kafka.trogdor.rest.StopTaskRequest;
+import org.apache.kafka.trogdor.rest.TaskRequest;
import org.apache.kafka.trogdor.rest.TasksRequest;
+import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TasksResponse;
import javax.servlet.ServletContext;
@@ -35,6 +37,8 @@ import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.NotFoundException;
import javax.ws.rs.core.MediaType;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
@@ -101,6 +105,16 @@ public class CoordinatorRestResource {
return coordinator().tasks(new TasksRequest(taskId, firstStartMs, lastStartMs, firstEndMs, lastEndMs));
}
+ @GET
+ @Path("/tasks/{taskId}")
+ public TaskState tasks(@PathParam("taskId") String taskId) throws Throwable {
+ TaskState response = coordinator().task(new TaskRequest(taskId));
+ if (response == null)
+ throw new NotFoundException(String.format("No task with ID \"%s\" exists.", taskId));
+
+ return response;
+ }
+
@PUT
@Path("/shutdown")
public Empty beginShutdown(CoordinatorShutdownRequest request) throws Throwable {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
index 74082bd..934acd3 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
@@ -36,6 +36,7 @@ import org.apache.kafka.trogdor.rest.TaskRunning;
import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TaskStopping;
import org.apache.kafka.trogdor.rest.TasksRequest;
+import org.apache.kafka.trogdor.rest.TaskRequest;
import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerReceiving;
@@ -629,6 +630,36 @@ public final class TaskManager {
}
/**
+ * Get information about a single task being managed.
+ *
+ * Returns #{@code null} if the task does not exist
+ */
+ public TaskState task(TaskRequest request) throws ExecutionException, InterruptedException {
+ return executor.submit(new GetTaskState(request)).get();
+ }
+
+ /**
+ * Gets information about the tasks being managed. Processed by the state change thread.
+ */
+ class GetTaskState implements Callable<TaskState> {
+ private final TaskRequest request;
+
+ GetTaskState(TaskRequest request) {
+ this.request = request;
+ }
+
+ @Override
+ public TaskState call() throws Exception {
+ ManagedTask task = tasks.get(request.taskId());
+ if (task == null) {
+ return null;
+ }
+
+ return task.taskState();
+ }
+ }
+
+ /**
* Initiate shutdown, but do not wait for it to complete.
*/
public void beginShutdown(boolean stopAgents) throws ExecutionException, InterruptedException {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRequest.java
new file mode 100644
index 0000000..e42738f
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskRequest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * The request to /coordinator/tasks/{taskId}
+ */
+public class TaskRequest {
+ private final String taskId;
+
+ @JsonCreator
+ public TaskRequest(@JsonProperty("taskId") String taskId) {
+ this.taskId = taskId == null ? "" : taskId;
+ }
+
+ @JsonProperty
+ public String taskId() {
+ return taskId;
+ }
+}
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
index 121281f..b0e30a0 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
@@ -71,7 +71,7 @@ public class ExpectedTasks {
}
}
- static class ExpectedTask {
+ public static class ExpectedTask {
private final String id;
private final TaskSpec taskSpec;
private final TaskState taskState;
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
index e943484..f22130e 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
@@ -41,7 +41,9 @@ import org.apache.kafka.trogdor.rest.StopTaskRequest;
import org.apache.kafka.trogdor.rest.TaskDone;
import org.apache.kafka.trogdor.rest.TaskPending;
import org.apache.kafka.trogdor.rest.TaskRunning;
+import org.apache.kafka.trogdor.rest.TaskRequest;
import org.apache.kafka.trogdor.rest.TasksRequest;
+import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerRunning;
@@ -53,6 +55,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Test;
+import javax.ws.rs.NotFoundException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -491,6 +494,40 @@ public class CoordinatorTest {
}
@Test
+ public void testTaskRequest() throws Exception {
+ MockTime time = new MockTime(0, 0, 0);
+ Scheduler scheduler = new MockScheduler(time);
+ try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
+ addCoordinator("node01").
+ addAgent("node02").
+ scheduler(scheduler).
+ build()) {
+ CoordinatorClient coordinatorClient = cluster.coordinatorClient();
+
+ NoOpTaskSpec fooSpec = new NoOpTaskSpec(1, 10);
+ coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec));
+ TaskState expectedState = new ExpectedTaskBuilder("foo").taskState(new TaskPending(fooSpec)).build().taskState();
+
+ TaskState resp = coordinatorClient.task(new TaskRequest("foo"));
+ assertEquals(expectedState, resp);
+
+ time.sleep(2);
+ new ExpectedTasks().
+ addTask(new ExpectedTaskBuilder("foo").
+ taskState(new TaskRunning(fooSpec, 2, new TextNode("active"))).
+ workerState(new WorkerRunning("foo", fooSpec, 2, new TextNode("active"))).
+ build()).
+ waitFor(coordinatorClient).
+ waitFor(cluster.agentClient("node02"));
+
+ try {
+ coordinatorClient.task(new TaskRequest("non-existent-foo"));
+ fail("Non existent task request should have raised a NotFoundException");
+ } catch (NotFoundException ignored) { }
+ }
+ }
+
+ @Test
public void testWorkersExitingAtDifferentTimes() throws Exception {
MockTime time = new MockTime(0, 0, 0);
Scheduler scheduler = new MockScheduler(time);