You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/04/05 12:35:24 UTC

[kafka] branch trunk updated: KAFKA-6694: The Trogdor Coordinator should support filtering task responses (#4741)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 63642d6  KAFKA-6694: The Trogdor Coordinator should support filtering task responses (#4741)
63642d6 is described below

commit 63642d6051015d84aa8c084380bcab174a5f3303
Author: Colin Patrick McCabe <co...@cmccabe.xyz>
AuthorDate: Thu Apr 5 05:35:20 2018 -0700

    KAFKA-6694: The Trogdor Coordinator should support filtering task responses (#4741)
---
 .../kafka/trogdor/coordinator/Coordinator.java     |   5 +-
 .../trogdor/coordinator/CoordinatorClient.java     |  16 ++-
 .../coordinator/CoordinatorRestResource.java       |  12 +-
 .../kafka/trogdor/coordinator/TaskManager.java     |  15 ++-
 .../apache/kafka/trogdor/rest/TasksRequest.java    | 123 +++++++++++++++++++++
 .../apache/kafka/trogdor/common/ExpectedTasks.java |   3 +-
 .../kafka/trogdor/coordinator/CoordinatorTest.java |  92 +++++++++++++++
 7 files changed, 255 insertions(+), 11 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
index b3418dd..717d7c7 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
@@ -31,6 +31,7 @@ import org.apache.kafka.trogdor.rest.CreateTaskResponse;
 import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.StopTaskRequest;
 import org.apache.kafka.trogdor.rest.StopTaskResponse;
+import org.apache.kafka.trogdor.rest.TasksRequest;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -94,8 +95,8 @@ public final class Coordinator {
         return new StopTaskResponse(taskManager.stopTask(request.id()));
     }
 
-    public TasksResponse tasks() throws Exception {
-        return taskManager.tasks();
+    public TasksResponse tasks(TasksRequest request) throws Exception {
+        return taskManager.tasks(request);
     }
 
     public void beginShutdown(boolean stopAgents) throws Exception {
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
index 870b64e..0677296 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
@@ -33,10 +33,13 @@ import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
 import org.apache.kafka.trogdor.rest.StopTaskRequest;
 import org.apache.kafka.trogdor.rest.StopTaskResponse;
+import org.apache.kafka.trogdor.rest.TasksRequest;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.core.UriBuilder;
+
 import static net.sourceforge.argparse4j.impl.Arguments.store;
 import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
 
@@ -127,9 +130,15 @@ public class CoordinatorClient {
         return resp.body();
     }
 
-    public TasksResponse tasks() throws Exception {
+    public TasksResponse tasks(TasksRequest request) throws Exception {
+        UriBuilder uriBuilder = UriBuilder.fromPath(url("/coordinator/tasks"));
+        uriBuilder.queryParam("taskId", request.taskIds().toArray(new String[0]));
+        uriBuilder.queryParam("firstStartMs", request.firstStartMs());
+        uriBuilder.queryParam("lastStartMs", request.lastStartMs());
+        uriBuilder.queryParam("firstEndMs", request.firstEndMs());
+        uriBuilder.queryParam("lastEndMs", request.lastEndMs());
         HttpResponse<TasksResponse> resp =
-            JsonRestServer.<TasksResponse>httpRequest(log, url("/coordinator/tasks"), "GET",
+            JsonRestServer.<TasksResponse>httpRequest(log, uriBuilder.build().toString(), "GET",
                 null, new TypeReference<TasksResponse>() { }, maxTries);
         return resp.body();
     }
@@ -204,7 +213,8 @@ public class CoordinatorClient {
                 JsonUtil.toPrettyJsonString(client.status()));
         } else if (res.getBoolean("show_tasks")) {
             System.out.println("Got coordinator tasks: " +
-                JsonUtil.toPrettyJsonString(client.tasks()));
+                JsonUtil.toPrettyJsonString(client.tasks(
+                    new TasksRequest(null, 0, 0, 0, 0))));
         } else if (res.getString("create_task") != null) {
             client.createTask(JsonUtil.JSON_SERDE.readValue(res.getString("create_task"),
                 CreateTaskRequest.class));
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
index 7775dd0..b8663ec 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
@@ -23,16 +23,20 @@ import org.apache.kafka.trogdor.rest.CreateTaskResponse;
 import org.apache.kafka.trogdor.rest.Empty;
 import org.apache.kafka.trogdor.rest.StopTaskRequest;
 import org.apache.kafka.trogdor.rest.StopTaskResponse;
+import org.apache.kafka.trogdor.rest.TasksRequest;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 
 import javax.servlet.ServletContext;
 import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
 
@@ -69,8 +73,12 @@ public class CoordinatorRestResource {
 
     @GET
     @Path("/tasks")
-    public TasksResponse tasks() throws Throwable {
-        return coordinator().tasks();
+    public TasksResponse tasks(@QueryParam("taskId") List<String> taskId,
+            @DefaultValue("0") @QueryParam("firstStartMs") int firstStartMs,
+            @DefaultValue("0") @QueryParam("lastStartMs") int lastStartMs,
+            @DefaultValue("0") @QueryParam("firstEndMs") int firstEndMs,
+            @DefaultValue("0") @QueryParam("lastEndMs") int lastEndMs) throws Throwable {
+        return coordinator().tasks(new TasksRequest(taskId, firstStartMs, lastStartMs, firstEndMs, lastEndMs));
     }
 
     @PUT
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
index 286f904..d88e1d5 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
@@ -29,6 +29,7 @@ import org.apache.kafka.trogdor.rest.TaskPending;
 import org.apache.kafka.trogdor.rest.TaskRunning;
 import org.apache.kafka.trogdor.rest.TaskState;
 import org.apache.kafka.trogdor.rest.TaskStopping;
+import org.apache.kafka.trogdor.rest.TasksRequest;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 import org.apache.kafka.trogdor.task.TaskController;
 import org.apache.kafka.trogdor.task.TaskSpec;
@@ -483,16 +484,24 @@ public final class TaskManager {
     /**
      * Get information about the tasks being managed.
      */
-    public TasksResponse tasks() throws ExecutionException, InterruptedException {
-        return executor.submit(new GetTasksResponse()).get();
+    public TasksResponse tasks(TasksRequest request) throws ExecutionException, InterruptedException {
+        return executor.submit(new GetTasksResponse(request)).get();
     }
 
     class GetTasksResponse implements Callable<TasksResponse> {
+        private final TasksRequest request;
+
+        GetTasksResponse(TasksRequest request) {
+            this.request = request;
+        }
+
         @Override
         public TasksResponse call() throws Exception {
             TreeMap<String, TaskState> states = new TreeMap<>();
             for (ManagedTask task : tasks.values()) {
-                states.put(task.id, task.taskState());
+                if (request.matches(task.id, task.startedMs, task.doneMs)) {
+                    states.put(task.id, task.taskState());
+                }
             }
             return new TasksResponse(states);
         }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksRequest.java
new file mode 100644
index 0000000..24b438a
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksRequest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * The request to /coordinator/tasks
+ */
+public class TasksRequest extends Message {
+    /**
+     * The task IDs to list.
+     * An empty set of task IDs indicates that we should list all task IDs.
+     */
+    private final Set<String> taskIds;
+
+    /**
+     * If this is non-zero, only tasks with a startMs at or after this time will be listed.
+     */
+    private final long firstStartMs;
+
+    /**
+     * If this is non-zero, only tasks with a startMs at or before this time will be listed.
+     */
+    private final long lastStartMs;
+
+    /**
+     * If this is non-zero, only tasks with an endMs at or after this time will be listed.
+     */
+    private final long firstEndMs;
+
+    /**
+     * If this is non-zero, only tasks with an endMs at or before this time will be listed.
+     */
+    private final long lastEndMs;
+
+    @JsonCreator
+    public TasksRequest(@JsonProperty("taskIds") Collection<String> taskIds,
+            @JsonProperty("firstStartMs") long firstStartMs,
+            @JsonProperty("lastStartMs") long lastStartMs,
+            @JsonProperty("firstEndMs") long firstEndMs,
+            @JsonProperty("lastEndMs") long lastEndMs) {
+        this.taskIds = Collections.unmodifiableSet((taskIds == null) ?
+            new HashSet<String>() : new HashSet<>(taskIds));
+        this.firstStartMs = Math.max(0, firstStartMs);
+        this.lastStartMs = Math.max(0, lastStartMs);
+        this.firstEndMs = Math.max(0, firstEndMs);
+        this.lastEndMs = Math.max(0, lastEndMs);
+    }
+
+    @JsonProperty
+    public Collection<String> taskIds() {
+        return taskIds;
+    }
+
+    @JsonProperty
+    public long firstStartMs() {
+        return firstStartMs;
+    }
+
+    @JsonProperty
+    public long lastStartMs() {
+        return lastStartMs;
+    }
+
+    @JsonProperty
+    public long firstEndMs() {
+        return firstEndMs;
+    }
+
+    @JsonProperty
+    public long lastEndMs() {
+        return lastEndMs;
+    }
+
+    /**
+     * Determine if this TaskRequest should return a particular task.
+     *
+     * @param taskId    The task ID.
+     * @param startMs   The task start time, or -1 if the task hasn't started.
+     * @param endMs     The task end time, or -1 if the task hasn't ended.
+     * @return          True if information about the task should be returned.
+     */
+    public boolean matches(String taskId, long startMs, long endMs) {
+        if ((!taskIds.isEmpty()) && (!taskIds.contains(taskId))) {
+            return false;
+        }
+        if ((firstStartMs > 0) && (startMs < firstStartMs)) {
+            return false;
+        }
+        if ((lastStartMs > 0) && ((startMs < 0) || (startMs > lastStartMs))) {
+            return false;
+        }
+        if ((firstEndMs > 0) && (endMs < firstEndMs)) {
+            return false;
+        }
+        if ((lastEndMs > 0) && ((endMs < 0) || (endMs > lastEndMs))) {
+            return false;
+        }
+        return true;
+    }
+}
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
index f72779f..617bf34 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
@@ -25,6 +25,7 @@ import org.apache.kafka.trogdor.agent.AgentClient;
 import org.apache.kafka.trogdor.coordinator.CoordinatorClient;
 import org.apache.kafka.trogdor.rest.AgentStatusResponse;
 import org.apache.kafka.trogdor.rest.TaskState;
+import org.apache.kafka.trogdor.rest.TasksRequest;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 import org.apache.kafka.trogdor.rest.WorkerState;
 import org.apache.kafka.trogdor.task.TaskSpec;
@@ -144,7 +145,7 @@ public class ExpectedTasks {
             public boolean conditionMet() {
                 TasksResponse tasks = null;
                 try {
-                    tasks = client.tasks();
+                    tasks = client.tasks(new TasksRequest(null, 0, 0, 0, 0));
                 } catch (Exception e) {
                     log.info("Unable to get coordinator tasks", e);
                     throw new RuntimeException(e);
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
index 4973823..004702f 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
@@ -36,6 +36,8 @@ import org.apache.kafka.trogdor.rest.StopTaskRequest;
 import org.apache.kafka.trogdor.rest.TaskDone;
 import org.apache.kafka.trogdor.rest.TaskPending;
 import org.apache.kafka.trogdor.rest.TaskRunning;
+import org.apache.kafka.trogdor.rest.TasksRequest;
+import org.apache.kafka.trogdor.rest.TasksResponse;
 import org.apache.kafka.trogdor.rest.WorkerDone;
 import org.apache.kafka.trogdor.rest.WorkerRunning;
 import org.apache.kafka.trogdor.task.NoOpTaskSpec;
@@ -50,6 +52,8 @@ import java.util.Arrays;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class CoordinatorTest {
     private static final Logger log = LoggerFactory.getLogger(CoordinatorTest.class);
@@ -302,4 +306,92 @@ public class CoordinatorTest {
                 "-m comment --comment node02").
             waitFor("node03", runner);
     }
+
+    @Test
+    public void testTasksRequestMatches() throws Exception {
+        TasksRequest req1 = new TasksRequest(null, 0, 0, 0, 0);
+        assertTrue(req1.matches("foo1", -1, -1));
+        assertTrue(req1.matches("bar1", 100, 200));
+        assertTrue(req1.matches("baz1", 100, -1));
+
+        TasksRequest req2 = new TasksRequest(null, 100, 0, 0, 0);
+        assertFalse(req2.matches("foo1", -1, -1));
+        assertTrue(req2.matches("bar1", 100, 200));
+        assertFalse(req2.matches("bar1", 99, 200));
+        assertFalse(req2.matches("baz1", 99, -1));
+
+        TasksRequest req3 = new TasksRequest(null, 200, 900, 200, 900);
+        assertFalse(req3.matches("foo1", -1, -1));
+        assertFalse(req3.matches("bar1", 100, 200));
+        assertFalse(req3.matches("bar1", 200, 1000));
+        assertTrue(req3.matches("bar1", 200, 700));
+        assertFalse(req3.matches("baz1", 101, -1));
+
+        List<String> taskIds = new ArrayList<>();
+        taskIds.add("foo1");
+        taskIds.add("bar1");
+        taskIds.add("baz1");
+        TasksRequest req4 = new TasksRequest(taskIds, 1000, -1, -1, -1);
+        assertFalse(req4.matches("foo1", -1, -1));
+        assertTrue(req4.matches("foo1", 1000, -1));
+        assertFalse(req4.matches("foo1", 900, -1));
+        assertFalse(req4.matches("baz2", 2000, -1));
+        assertFalse(req4.matches("baz2", -1, -1));
+    }
+
+    @Test
+    public void testTasksRequest() throws Exception {
+        MockTime time = new MockTime(0, 0, 0);
+        Scheduler scheduler = new MockScheduler(time);
+        try (MiniTrogdorCluster cluster = new MiniTrogdorCluster.Builder().
+            addCoordinator("node01").
+            addAgent("node02").
+            scheduler(scheduler).
+            build()) {
+            CoordinatorClient coordinatorClient = cluster.coordinatorClient();
+            new ExpectedTasks().waitFor(coordinatorClient);
+
+            NoOpTaskSpec fooSpec = new NoOpTaskSpec(1, 10);
+            NoOpTaskSpec barSpec = new NoOpTaskSpec(3, 1);
+            coordinatorClient.createTask(new CreateTaskRequest("foo", fooSpec));
+            coordinatorClient.createTask(new CreateTaskRequest("bar", barSpec));
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskPending(fooSpec)).
+                    build()).
+                addTask(new ExpectedTaskBuilder("bar").
+                    taskState(new TaskPending(barSpec)).
+                    build()).
+                waitFor(coordinatorClient);
+
+            assertEquals(0, coordinatorClient.tasks(
+                new TasksRequest(null, 10, 0, 10, 0)).tasks().size());
+            TasksResponse resp1 = coordinatorClient.tasks(
+                new TasksRequest(Arrays.asList(new String[] {"foo", "baz" }), 0, 0, 0, 0));
+            assertTrue(resp1.tasks().containsKey("foo"));
+            assertFalse(resp1.tasks().containsKey("bar"));
+            assertEquals(1, resp1.tasks().size());
+
+            time.sleep(2);
+            new ExpectedTasks().
+                addTask(new ExpectedTaskBuilder("foo").
+                    taskState(new TaskRunning(fooSpec, 2)).
+                    workerState(new WorkerRunning(fooSpec, 2, "")).
+                    build()).
+                addTask(new ExpectedTaskBuilder("bar").
+                    taskState(new TaskPending(barSpec)).
+                    build()).
+                waitFor(coordinatorClient).
+                waitFor(cluster.agentClient("node02"));
+
+            TasksResponse resp2 = coordinatorClient.tasks(
+                new TasksRequest(null, 1, 0, 0, 0));
+            assertTrue(resp2.tasks().containsKey("foo"));
+            assertFalse(resp2.tasks().containsKey("bar"));
+            assertEquals(1, resp2.tasks().size());
+
+            assertEquals(0, coordinatorClient.tasks(
+                new TasksRequest(null, 3, 0, 0, 0)).tasks().size());
+        }
+    }
 };

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.