You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/04/05 12:35:24 UTC
[kafka] branch trunk updated: KAFKA-6694: The Trogdor Coordinator
should support filtering task responses (#4741)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 63642d6 KAFKA-6694: The Trogdor Coordinator should support filtering task responses (#4741)
63642d6 is described below
commit 63642d6051015d84aa8c084380bcab174a5f3303
Author: Colin Patrick McCabe <co...@cmccabe.xyz>
AuthorDate: Thu Apr 5 05:35:20 2018 -0700
KAFKA-6694: The Trogdor Coordinator should support filtering task responses (#4741)
---
.../kafka/trogdor/coordinator/Coordinator.java | 5 +-
.../trogdor/coordinator/CoordinatorClient.java | 16 ++-
.../coordinator/CoordinatorRestResource.java | 12 +-
.../kafka/trogdor/coordinator/TaskManager.java | 15 ++-
.../apache/kafka/trogdor/rest/TasksRequest.java | 123 +++++++++++++++++++++
.../apache/kafka/trogdor/common/ExpectedTasks.java | 3 +-
.../kafka/trogdor/coordinator/CoordinatorTest.java | 92 +++++++++++++++
7 files changed, 255 insertions(+), 11 deletions(-)
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
index b3418dd..717d7c7 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
@@ -31,6 +31,7 @@ import org.apache.kafka.trogdor.rest.CreateTaskResponse;
import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.StopTaskRequest;
import org.apache.kafka.trogdor.rest.StopTaskResponse;
+import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TasksResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,8 +95,8 @@ public final class Coordinator {
return new StopTaskResponse(taskManager.stopTask(request.id()));
}
- public TasksResponse tasks() throws Exception {
- return taskManager.tasks();
+ public TasksResponse tasks(TasksRequest request) throws Exception {
+ return taskManager.tasks(request);
}
public void beginShutdown(boolean stopAgents) throws Exception {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
index 870b64e..0677296 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
@@ -33,10 +33,13 @@ import org.apache.kafka.trogdor.rest.JsonRestServer;
import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
import org.apache.kafka.trogdor.rest.StopTaskRequest;
import org.apache.kafka.trogdor.rest.StopTaskResponse;
+import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TasksResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.ws.rs.core.UriBuilder;
+
import static net.sourceforge.argparse4j.impl.Arguments.store;
import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
@@ -127,9 +130,15 @@ public class CoordinatorClient {
return resp.body();
}
- public TasksResponse tasks() throws Exception {
+ public TasksResponse tasks(TasksRequest request) throws Exception {
+ UriBuilder uriBuilder = UriBuilder.fromPath(url("/coordinator/tasks"));
+ uriBuilder.queryParam("taskId", request.taskIds().toArray(new String[0]));
+ uriBuilder.queryParam("firstStartMs", request.firstStartMs());
+ uriBuilder.queryParam("lastStartMs", request.lastStartMs());
+ uriBuilder.queryParam("firstEndMs", request.firstEndMs());
+ uriBuilder.queryParam("lastEndMs", request.lastEndMs());
HttpResponse<TasksResponse> resp =
- JsonRestServer.<TasksResponse>httpRequest(log, url("/coordinator/tasks"), "GET",
+ JsonRestServer.<TasksResponse>httpRequest(log, uriBuilder.build().toString(), "GET",
null, new TypeReference<TasksResponse>() { }, maxTries);
return resp.body();
}
@@ -204,7 +213,8 @@ public class CoordinatorClient {
JsonUtil.toPrettyJsonString(client.status()));
} else if (res.getBoolean("show_tasks")) {
System.out.println("Got coordinator tasks: " +
- JsonUtil.toPrettyJsonString(client.tasks()));
+ JsonUtil.toPrettyJsonString(client.tasks(
+ new TasksRequest(null, 0, 0, 0, 0))));
} else if (res.getString("create_task") != null) {
client.createTask(JsonUtil.JSON_SERDE.readValue(res.getString("create_task"),
CreateTaskRequest.class));
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
index 7775dd0..b8663ec 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
@@ -23,16 +23,20 @@ import org.apache.kafka.trogdor.rest.CreateTaskResponse;
import org.apache.kafka.trogdor.rest.Empty;
import org.apache.kafka.trogdor.rest.StopTaskRequest;
import org.apache.kafka.trogdor.rest.StopTaskResponse;
+import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TasksResponse;
import javax.servlet.ServletContext;
import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
+import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
@@ -69,8 +73,12 @@ public class CoordinatorRestResource {
@GET
@Path("/tasks")
- public TasksResponse tasks() throws Throwable {
- return coordinator().tasks();
+ public TasksResponse tasks(@QueryParam("taskId") List<String> taskId,
+ @DefaultValue("0") @QueryParam("firstStartMs") int firstStartMs,
+ @DefaultValue("0") @QueryParam("lastStartMs") int lastStartMs,
+ @DefaultValue("0") @QueryParam("firstEndMs") int firstEndMs,
+ @DefaultValue("0") @QueryParam("lastEndMs") int lastEndMs) throws Throwable {
+ return coordinator().tasks(new TasksRequest(taskId, firstStartMs, lastStartMs, firstEndMs, lastEndMs));
}
@PUT
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
index 286f904..d88e1d5 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
@@ -29,6 +29,7 @@ import org.apache.kafka.trogdor.rest.TaskPending;
import org.apache.kafka.trogdor.rest.TaskRunning;
import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TaskStopping;
+import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.task.TaskController;
import org.apache.kafka.trogdor.task.TaskSpec;
@@ -483,16 +484,24 @@ public final class TaskManager {
/**
* Get information about the tasks being managed.
*/
- public TasksResponse tasks() throws ExecutionException, InterruptedException {
- return executor.submit(new GetTasksResponse()).get();
+ public TasksResponse tasks(TasksRequest request) throws ExecutionException, InterruptedException {
+ return executor.submit(new GetTasksResponse(request)).get();
}
class GetTasksResponse implements Callable<TasksResponse> {
+ private final TasksRequest request;
+
+ GetTasksResponse(TasksRequest request) {
+ this.request = request;
+ }
+
@Override
public TasksResponse call() throws Exception {
TreeMap<String, TaskState> states = new TreeMap<>();
for (ManagedTask task : tasks.values()) {
- states.put(task.id, task.taskState());
+ if (request.matches(task.id, task.startedMs, task.doneMs)) {
+ states.put(task.id, task.taskState());
+ }
}
return new TasksResponse(states);
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksRequest.java
new file mode 100644
index 0000000..24b438a
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksRequest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * The request to /coordinator/tasks
+ */
+public class TasksRequest extends Message {
+ /**
+ * The task IDs to list.
+ * An empty set of task IDs indicates that we should list all task IDs.
+ */
+ private final Set<String> taskIds;
+
+ /**
+ * If this is non-zero, only tasks with a startMs at or after this time will be listed.
+ */
+ private final long firstStartMs;
+
+ /**
+ * If this is non-zero, only tasks with a startMs at or before this time will be listed.
+ */
+ private final long lastStartMs;
+
+ /**
+ * If this is non-zero, only tasks with an endMs at or after this time will be listed.
+ */
+ private final long firstEndMs;
+
+ /**
+ * If this is non-zero, only tasks with an endMs at or before this time will be listed.
+ */
+ private final long lastEndMs;
+
+ @JsonCreator
+ public TasksRequest(@JsonProperty("taskIds") Collection<String> taskIds,
+ @JsonProperty("firstStartMs") long firstStartMs,
+ @JsonProperty("lastStartMs") long lastStartMs,
+ @JsonProperty("firstEndMs") long firstEndMs,
+ @JsonProperty("lastEndMs") long lastEndMs) {
+ this.taskIds = Collections.unmodifiableSet((taskIds == null) ?
+ new HashSet<String>() : new HashSet<>(taskIds));
+ this.firstStartMs = Math.max(0, firstStartMs);
+ this.lastStartMs = Math.max(0, lastStartMs);
+ this.firstEndMs = Math.max(0, firstEndMs);
+ this.lastEndMs = Math.max(0, lastEndMs);
+ }
+
+ @JsonProperty
+ public Collection<String> taskIds() {
+ return taskIds;
+ }
+
+ @JsonProperty
+ public long firstStartMs() {
+ return firstStartMs;
+ }
+
+ @JsonProperty
+ public long lastStartMs() {
+ return lastStartMs;
+ }
+
+ @JsonProperty
+ public long firstEndMs() {
+ return firstEndMs;
+ }
+
+ @JsonProperty
+ public long lastEndMs() {
+ return lastEndMs;
+ }
+
+ /**
+ * Determine if this TaskRequest should return a particular task.
+ *
+ * @param taskId The task ID.
+ * @param startMs The task start time, or -1 if the task hasn't started.
+ * @param endMs The task end time, or -1 if the task hasn't ended.
+ * @return True if information about the task should be returned.
+ */
+ public boolean matches(String taskId, long startMs, long endMs) {
+ if ((!taskIds.isEmpty()) && (!taskIds.contains(taskId))) {
+ return false;
+ }
+ if ((firstStartMs > 0) && (startMs < firstStartMs)) {
+ return false;
+ }
+ if ((lastStartMs > 0) && ((startMs < 0) || (startMs > lastStartMs))) {
+ return false;
+ }
+ if ((firstEndMs > 0) && (endMs < firstEndMs)) {
+ return false;
+ }
+ if ((lastEndMs > 0) && ((endMs < 0) || (endMs > lastEndMs))) {
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
index f72779f..617bf34 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
@@ -25,6 +25,7 @@ import org.apache.kafka.trogdor.agent.AgentClient;
import org.apache.kafka.trogdor.coordinator.CoordinatorClient;
import org.apache.kafka.trogdor.rest.AgentStatusResponse;
import org.apache.kafka.trogdor.rest.TaskState;
+import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.rest.WorkerState;
import org.apache.kafka.trogdor.task.TaskSpec;
@@ -144,7 +145,7 @@ public class ExpectedTasks {
public boolean conditionMet() {
TasksResponse tasks = null;
try {
- tasks = client.tasks();
+ tasks = client.tasks(new TasksRequest(null, 0, 0, 0, 0));
} catch (Exception e) {
log.info("Unable to get coordinator tasks", e);
throw new RuntimeException(e);
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
index 4973823..004702f 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
@@ -36,6 +36,8 @@ import org.apache.kafka.trogdor.rest.StopTaskRequest;
import org.apache.kafka.trogdor.rest.TaskDone;
import org.apache.kafka.trogdor.rest.TaskPending;
import org.apache.kafka.trogdor.rest.TaskRunning;
+import org.apache.kafka.trogdor.rest.TasksRequest;
+import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerRunning;
import org.apache.kafka.trogdor.task.NoOpTaskSpec;
@@ -50,6 +52,8 @@ import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class CoordinatorTest {
private static final Logger log = LoggerFactory.getLogger(CoordinatorTest.class);
@@ -302,4 +306,92 @@ public class CoordinatorTest {
"-m comment --comment node02").
waitFor("node03", runner);
}
+
+ @Test
+ public void testTasksRequestMatches() throws Exception {
+ TasksRequest req1 = new TasksRequest(null, 0, 0, 0, 0);
+ assertTrue(req1.matches("foo1", -1, -1));
+ assertTrue(req1.matches("bar1", 100, 200));
+ assertTrue(req1.matches("baz1", 100, -1));
+
+ TasksRequest req2 = new TasksRequest(null, 100, 0, 0, 0);
+ assertFalse(req2.matches("foo1", -1, -1));
+ assertTrue(req2.matches("bar1", 100, 200));
+ assertFalse(req2.matches("bar1", 99, 200));
+ assertFalse(req2.matches("baz1", 99, -1));
+
+ TasksRequest req3 = new TasksRequest(null, 200, 900, 200, 900);
+ assertFalse(req3.matches("foo1", -1, -1));
+ assertFalse(req3.matches("bar1", 100, 200));
+ assertFalse(req3.matches("bar1", 200, 1000));
+ assertTrue(req3.matches("bar1", 200, 700));
+ assertFalse(req3.matches("baz1", 101, -1));
+
+ List<String> taskIds = new ArrayList<>();
+ taskIds.add("foo1");
+ taskIds.add("bar1");
+ taskIds.add("baz1");
+ TasksRequest req4 = new TasksRequest(taskIds, 1000, -1, -1, -1);
+ assertFalse(req4.matches("foo1", -1, -1));
+ assertTrue(req4.matches("foo1", 1000, -1));
+ assertFalse(req4.matches("foo1", 900, -1));
+ assertFalse(req4.matches("baz2", 2000, -1));
+ assertFalse(req4.matches("baz2", -1, -1));
+ }
+
+ @Test
+ public void testTasksRequest() throws Exception {
+ MockTime time = new MockTime(0, 0, 0);
+ Scheduler scheduler = new MockScheduler(time);
+ try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
+ addCoordinator("node01").
+ addAgent("node02").
+ scheduler(scheduler).
+ build()) {
+ CoordinatorClient coordinatorClient = cluster.coordinatorClient();
+ new ExpectedTasks().waitFor(coordinatorClient);
+
+ NoOpTaskSpec fooSpec = new NoOpTaskSpec(1, 10);
+ NoOpTaskSpec barSpec = new NoOpTaskSpec(3, 1);
+ coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec));
+ coordinatorClient.createTask(new CreateTaskRequest("bar", barSpec));
+ new ExpectedTasks().
+ addTask(new ExpectedTaskBuilder("foo").
+ taskState(new TaskPending(fooSpec)).
+ build()).
+ addTask(new ExpectedTaskBuilder("bar").
+ taskState(new TaskPending(barSpec)).
+ build()).
+ waitFor(coordinatorClient);
+
+ assertEquals(0, coordinatorClient.tasks(
+ new TasksRequest(null, 10, 0, 10, 0)).tasks().size());
+ TasksResponse resp1 = coordinatorClient.tasks(
+ new TasksRequest(Arrays.asList(new String[] {"foo", "baz" }), 0, 0, 0, 0));
+ assertTrue(resp1.tasks().containsKey("foo"));
+ assertFalse(resp1.tasks().containsKey("bar"));
+ assertEquals(1, resp1.tasks().size());
+
+ time.sleep(2);
+ new ExpectedTasks().
+ addTask(new ExpectedTaskBuilder("foo").
+ taskState(new TaskRunning(fooSpec, 2)).
+ workerState(new WorkerRunning(fooSpec, 2, "")).
+ build()).
+ addTask(new ExpectedTaskBuilder("bar").
+ taskState(new TaskPending(barSpec)).
+ build()).
+ waitFor(coordinatorClient).
+ waitFor(cluster.agentClient("node02"));
+
+ TasksResponse resp2 = coordinatorClient.tasks(
+ new TasksRequest(null, 1, 0, 0, 0));
+ assertTrue(resp2.tasks().containsKey("foo"));
+ assertFalse(resp2.tasks().containsKey("bar"));
+ assertEquals(1, resp2.tasks().size());
+
+ assertEquals(0, coordinatorClient.tasks(
+ new TasksRequest(null, 3, 0, 0, 0)).tasks().size());
+ }
+ }
};
--
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.