You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2018/11/27 00:48:02 UTC

[kafka] branch trunk updated: Trogdor: Add Task State filter to /coordinator/tasks endpoint (#5907)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7fadf0a  Trogdor: Add Task State filter to /coordinator/tasks endpoint (#5907)
7fadf0a is described below

commit 7fadf0a11ddb2451adc66e1b179b84b050ad3f4f
Author: Stanislav Kozlovski <st...@outlook.com>
AuthorDate: Mon Nov 26 16:07:15 2018 -0800

    Trogdor: Add Task State filter to /coordinator/tasks endpoint (#5907)
    
    Reviewers: Colin McCabe <cm...@apache.org>
---
 checkstyle/suppressions.xml                        |  2 +-
 .../trogdor/coordinator/CoordinatorClient.java     |  4 +-
 .../coordinator/CoordinatorRestResource.java       | 29 +++++++---
 .../kafka/trogdor/coordinator/TaskManager.java     | 38 ++++++-------
 .../org/apache/kafka/trogdor/rest/TaskState.java   |  8 +--
 .../apache/kafka/trogdor/rest/TaskStateType.java   | 42 +++++++++++++++
 .../apache/kafka/trogdor/rest/TasksRequest.java    | 23 +++++++-
 .../apache/kafka/trogdor/common/ExpectedTasks.java |  3 +-
 .../kafka/trogdor/coordinator/CoordinatorTest.java | 62 ++++++++++++----------
 9 files changed, 146 insertions(+), 65 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f3ab7ec..75ad799 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -57,7 +57,7 @@
               files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java"/>
 
     <suppress checks="NPathComplexity"
-              files="(BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster).java"/>
+              files="(BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest).java"/>
 
     <!-- clients tests -->
     <suppress checks="ClassDataAbstractionCoupling"
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
index 80937a8..4670d2c 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
@@ -42,6 +42,8 @@ import org.slf4j.LoggerFactory;
 import javax.ws.rs.NotFoundException;
 import javax.ws.rs.core.UriBuilder;
 
+import java.util.Optional;
+
 import static net.sourceforge.argparse4j.impl.Arguments.store;
 import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
 
@@ -244,7 +246,7 @@ public class CoordinatorClient {
         } else if (res.getBoolean("show_tasks")) {
             System.out.println("Got coordinator tasks: " +
                 JsonUtil.toPrettyJsonString(client.tasks(
-                    new TasksRequest(null, 0, 0, 0, 0))));
+                    new TasksRequest(null, 0, 0, 0, 0, Optional.empty()))));
         } else if (res.getString("show_task") != null) {
             String taskId = res.getString("show_task");
             TaskRequest req = new TaskRequest(res.getString("show_task"));
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
index c0e7fc9..91f731e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
@@ -23,8 +23,9 @@ import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
 import org.apache.kafka.trogdor.rest.Empty;
 import org.apache.kafka.trogdor.rest.StopTaskRequest;
 import org.apache.kafka.trogdor.rest.TaskRequest;
-import org.apache.kafka.trogdor.rest.TasksRequest;
 import org.apache.kafka.trogdor.rest.TaskState;
+import org.apache.kafka.trogdor.rest.TaskStateType;
+import org.apache.kafka.trogdor.rest.TasksRequest;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 
 import javax.servlet.ServletContext;
@@ -32,15 +33,17 @@ import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
+import javax.ws.rs.NotFoundException;
 import javax.ws.rs.POST;
 import javax.ws.rs.PUT;
 import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.NotFoundException;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -96,13 +99,25 @@ public class CoordinatorRestResource {
     }
 
     @GET
-    @Path("/tasks")
-    public TasksResponse tasks(@QueryParam("taskId") List<String> taskId,
+    @Path("/tasks/")
+    public Response tasks(@QueryParam("taskId") List<String> taskId,
             @DefaultValue("0") @QueryParam("firstStartMs") long firstStartMs,
             @DefaultValue("0") @QueryParam("lastStartMs") long lastStartMs,
             @DefaultValue("0") @QueryParam("firstEndMs") long firstEndMs,
-            @DefaultValue("0") @QueryParam("lastEndMs") long lastEndMs) throws Throwable {
-        return coordinator().tasks(new TasksRequest(taskId, firstStartMs, lastStartMs, firstEndMs, lastEndMs));
+            @DefaultValue("0") @QueryParam("lastEndMs") long lastEndMs,
+            @DefaultValue("") @QueryParam("state") String state) throws Throwable {
+        boolean isEmptyState = state.equals("");
+        if (!isEmptyState && !TaskStateType.Constants.VALUES.contains(state)) {
+            return Response.status(400).entity(
+                String.format("State %s is invalid. Must be one of %s",
+                    state, TaskStateType.Constants.VALUES)
+            ).build();
+        }
+
+        Optional<TaskStateType> givenState = Optional.ofNullable(isEmptyState ? null : TaskStateType.valueOf(state));
+        TasksResponse resp = coordinator().tasks(new TasksRequest(taskId, firstStartMs, lastStartMs, firstEndMs, lastEndMs, givenState));
+
+        return Response.status(200).entity(resp).build();
     }
 
     @GET
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
index 934acd3..18ff9cb 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
@@ -32,11 +32,12 @@ import org.apache.kafka.trogdor.common.ThreadUtils;
 import org.apache.kafka.trogdor.rest.RequestConflictException;
 import org.apache.kafka.trogdor.rest.TaskDone;
 import org.apache.kafka.trogdor.rest.TaskPending;
+import org.apache.kafka.trogdor.rest.TaskRequest;
 import org.apache.kafka.trogdor.rest.TaskRunning;
 import org.apache.kafka.trogdor.rest.TaskState;
+import org.apache.kafka.trogdor.rest.TaskStateType;
 import org.apache.kafka.trogdor.rest.TaskStopping;
 import org.apache.kafka.trogdor.rest.TasksRequest;
-import org.apache.kafka.trogdor.rest.TaskRequest;
 import org.apache.kafka.trogdor.rest.TasksResponse;
 import org.apache.kafka.trogdor.rest.WorkerDone;
 import org.apache.kafka.trogdor.rest.WorkerReceiving;
@@ -142,13 +143,6 @@ public final class TaskManager {
             Utils.join(nodeManagers.keySet(), ", "));
     }
 
-    enum ManagedTaskState {
-        PENDING,
-        RUNNING,
-        STOPPING,
-        DONE;
-    }
-
     class ManagedTask {
         /**
          * The task id.
@@ -168,7 +162,7 @@ public final class TaskManager {
         /**
          * The task state.
          */
-        private ManagedTaskState state;
+        private TaskStateType state;
 
         /**
          * The time when the task was started, or -1 if the task has not been started.
@@ -201,7 +195,7 @@ public final class TaskManager {
          */
         private String error = "";
 
-        ManagedTask(String id, TaskSpec spec, TaskController controller, ManagedTaskState state) {
+        ManagedTask(String id, TaskSpec spec, TaskController controller, TaskStateType state) {
             this.id = id;
             this.spec = spec;
             this.controller = controller;
@@ -345,13 +339,13 @@ public final class TaskManager {
             if (failure != null) {
                 log.info("Failed to create a new task {} with spec {}: {}",
                     id, spec, failure);
-                task = new ManagedTask(id, spec, null, ManagedTaskState.DONE);
+                task = new ManagedTask(id, spec, null, TaskStateType.DONE);
                 task.doneMs = time.milliseconds();
                 task.maybeSetError(failure);
                 tasks.put(id, task);
                 return null;
             }
-            task = new ManagedTask(id, spec, controller, ManagedTaskState.PENDING);
+            task = new ManagedTask(id, spec, controller, TaskStateType.PENDING);
             tasks.put(id, task);
             long delayMs = task.startDelayMs(time.milliseconds());
             task.startFuture = scheduler.schedule(executor, new RunTask(task), delayMs);
@@ -374,7 +368,7 @@ public final class TaskManager {
         @Override
         public Void call() throws Exception {
             task.clearStartFuture();
-            if (task.state != ManagedTaskState.PENDING) {
+            if (task.state != TaskStateType.PENDING) {
                 log.info("Can't start task {}, because it is already in state {}.",
                     task.id, task.state);
                 return null;
@@ -385,12 +379,12 @@ public final class TaskManager {
             } catch (Exception e) {
                 log.error("Unable to find nodes for task {}", task.id, e);
                 task.doneMs = time.milliseconds();
-                task.state = ManagedTaskState.DONE;
+                task.state = TaskStateType.DONE;
                 task.maybeSetError("Unable to find nodes for task: " + e.getMessage());
                 return null;
             }
             log.info("Running task {} on node(s): {}", task.id, Utils.join(nodeNames, ", "));
-            task.state = ManagedTaskState.RUNNING;
+            task.state = TaskStateType.RUNNING;
             task.startedMs = time.milliseconds();
             for (String workerName : nodeNames) {
                 long workerId = nextWorkerId++;
@@ -441,7 +435,7 @@ public final class TaskManager {
                     task.cancelled = true;
                     task.clearStartFuture();
                     task.doneMs = time.milliseconds();
-                    task.state = ManagedTaskState.DONE;
+                    task.state = TaskStateType.DONE;
                     log.info("Stopped pending task {}.", id);
                     break;
                 case RUNNING:
@@ -454,14 +448,14 @@ public final class TaskManager {
                             log.info("Task {} is now complete with error: {}", id, task.error);
                         }
                         task.doneMs = time.milliseconds();
-                        task.state = ManagedTaskState.DONE;
+                        task.state = TaskStateType.DONE;
                     } else {
                         for (Map.Entry<String, Long> entry : activeWorkerIds.entrySet()) {
                             nodeManagers.get(entry.getKey()).stopWorker(entry.getValue());
                         }
                         log.info("Cancelling task {} with worker(s) {}",
                             id, Utils.mkString(activeWorkerIds, "", "", " = ", ", "));
-                        task.state = ManagedTaskState.STOPPING;
+                        task.state = TaskStateType.STOPPING;
                     }
                     break;
                 case STOPPING:
@@ -586,14 +580,14 @@ public final class TaskManager {
         TreeMap<String, Long> activeWorkerIds = task.activeWorkerIds();
         if (activeWorkerIds.isEmpty()) {
             task.doneMs = time.milliseconds();
-            task.state = ManagedTaskState.DONE;
+            task.state = TaskStateType.DONE;
             log.info("{}: Task {} is now complete on {} with error: {}",
                 nodeName, task.id, Utils.join(task.workerIds.keySet(), ", "),
                 task.error.isEmpty() ? "(none)" : task.error);
-        } else if ((task.state == ManagedTaskState.RUNNING) && (!task.error.isEmpty())) {
+        } else if ((task.state == TaskStateType.RUNNING) && (!task.error.isEmpty())) {
             log.info("{}: task {} stopped with error {}.  Stopping worker(s): {}",
                 nodeName, task.id, task.error, Utils.mkString(activeWorkerIds, "{", "}", ": ", ", "));
-            task.state = ManagedTaskState.STOPPING;
+            task.state = TaskStateType.STOPPING;
             for (Map.Entry<String, Long> entry : activeWorkerIds.entrySet()) {
                 nodeManagers.get(entry.getKey()).stopWorker(entry.getValue());
             }
@@ -621,7 +615,7 @@ public final class TaskManager {
         public TasksResponse call() throws Exception {
             TreeMap<String, TaskState> states = new TreeMap<>();
             for (ManagedTask task : tasks.values()) {
-                if (request.matches(task.id, task.startedMs, task.doneMs)) {
+                if (request.matches(task.id, task.startedMs, task.doneMs, task.state)) {
                     states.put(task.id, task.taskState());
                 }
             }
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
index 0764e14..2428893 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
@@ -31,10 +31,10 @@ import org.apache.kafka.trogdor.task.TaskSpec;
     include = JsonTypeInfo.As.PROPERTY,
     property = "state")
 @JsonSubTypes({
-        @JsonSubTypes.Type(value = TaskPending.class, name = "PENDING"),
-        @JsonSubTypes.Type(value = TaskRunning.class, name = "RUNNING"),
-        @JsonSubTypes.Type(value = TaskStopping.class, name = "STOPPING"),
-        @JsonSubTypes.Type(value = TaskDone.class, name = "DONE")
+        @JsonSubTypes.Type(value = TaskPending.class, name = TaskStateType.Constants.PENDING_VALUE),
+        @JsonSubTypes.Type(value = TaskRunning.class, name = TaskStateType.Constants.RUNNING_VALUE),
+        @JsonSubTypes.Type(value = TaskStopping.class, name = TaskStateType.Constants.STOPPING_VALUE),
+        @JsonSubTypes.Type(value = TaskDone.class, name = TaskStateType.Constants.DONE_VALUE)
     })
 public abstract class TaskState extends Message {
     private final TaskSpec spec;
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStateType.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStateType.java
new file mode 100644
index 0000000..c8ade06
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStateType.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.trogdor.rest;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The types of states a single Task can be in
+ */
+public enum TaskStateType {
+    PENDING(Constants.PENDING_VALUE),
+    RUNNING(Constants.RUNNING_VALUE),
+    STOPPING(Constants.STOPPING_VALUE),
+    DONE(Constants.DONE_VALUE);
+
+    TaskStateType(String stateType) {}
+
+    public static class Constants {
+        static final String PENDING_VALUE = "PENDING";
+        static final String RUNNING_VALUE = "RUNNING";
+        static final String STOPPING_VALUE = "STOPPING";
+        static final String DONE_VALUE = "DONE";
+        public static final List<String> VALUES = Collections.unmodifiableList(
+            Arrays.asList(PENDING_VALUE, RUNNING_VALUE, STOPPING_VALUE, DONE_VALUE));
+    }
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksRequest.java
index 24b438a..150a362 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksRequest.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksRequest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -55,18 +56,26 @@ public class TasksRequest extends Message {
      */
     private final long lastEndMs;
 
+    /**
+     * The desired state of the tasks.
+     * An empty string will match all states.
+     */
+    private final Optional<TaskStateType> state;
+
     @JsonCreator
     public TasksRequest(@JsonProperty("taskIds") Collection<String> taskIds,
             @JsonProperty("firstStartMs") long firstStartMs,
             @JsonProperty("lastStartMs") long lastStartMs,
             @JsonProperty("firstEndMs") long firstEndMs,
-            @JsonProperty("lastEndMs") long lastEndMs) {
+            @JsonProperty("lastEndMs") long lastEndMs,
+            @JsonProperty("state") Optional<TaskStateType> state) {
         this.taskIds = Collections.unmodifiableSet((taskIds == null) ?
             new HashSet<String>() : new HashSet<>(taskIds));
         this.firstStartMs = Math.max(0, firstStartMs);
         this.lastStartMs = Math.max(0, lastStartMs);
         this.firstEndMs = Math.max(0, firstEndMs);
         this.lastEndMs = Math.max(0, lastEndMs);
+        this.state = state == null ? Optional.empty() : state;
     }
 
     @JsonProperty
@@ -94,6 +103,11 @@ public class TasksRequest extends Message {
         return lastEndMs;
     }
 
+    @JsonProperty
+    public Optional<TaskStateType> state() {
+        return state;
+    }
+
     /**
      * Determine if this TaskRequest should return a particular task.
      *
@@ -102,7 +116,7 @@ public class TasksRequest extends Message {
      * @param endMs     The task end time, or -1 if the task hasn't ended.
      * @return          True if information about the task should be returned.
      */
-    public boolean matches(String taskId, long startMs, long endMs) {
+    public boolean matches(String taskId, long startMs, long endMs, TaskStateType state) {
         if ((!taskIds.isEmpty()) && (!taskIds.contains(taskId))) {
             return false;
         }
@@ -118,6 +132,11 @@ public class TasksRequest extends Message {
         if ((lastEndMs > 0) && ((endMs < 0) || (endMs > lastEndMs))) {
             return false;
         }
+
+        if (this.state.isPresent() && !this.state.get().equals(state)) {
+            return false;
+        }
+
         return true;
     }
 }
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
index b0e30a0..c092c92 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.TreeMap;
 
 public class ExpectedTasks {
@@ -146,7 +147,7 @@ public class ExpectedTasks {
             public boolean conditionMet() {
                 TasksResponse tasks = null;
                 try {
-                    tasks = client.tasks(new TasksRequest(null, 0, 0, 0, 0));
+                    tasks = client.tasks(new TasksRequest(null, 0, 0, 0, 0, Optional.empty()));
                 } catch (Exception e) {
                     log.info("Unable to get coordinator tasks", e);
                     throw new RuntimeException(e);
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
index f22130e..0207104 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.trogdor.rest.TaskDone;
 import org.apache.kafka.trogdor.rest.TaskPending;
 import org.apache.kafka.trogdor.rest.TaskRunning;
 import org.apache.kafka.trogdor.rest.TaskRequest;
+import org.apache.kafka.trogdor.rest.TaskStateType;
 import org.apache.kafka.trogdor.rest.TasksRequest;
 import org.apache.kafka.trogdor.rest.TaskState;
 import org.apache.kafka.trogdor.rest.TasksResponse;
@@ -60,6 +61,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Optional;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -407,34 +409,40 @@ public class CoordinatorTest {
 
     @Test
     public void testTasksRequestMatches() throws Exception {
-        TasksRequest req1 = new TasksRequest(null, 0, 0, 0, 0);
-        assertTrue(req1.matches("foo1", -1, -1));
-        assertTrue(req1.matches("bar1", 100, 200));
-        assertTrue(req1.matches("baz1", 100, -1));
-
-        TasksRequest req2 = new TasksRequest(null, 100, 0, 0, 0);
-        assertFalse(req2.matches("foo1", -1, -1));
-        assertTrue(req2.matches("bar1", 100, 200));
-        assertFalse(req2.matches("bar1", 99, 200));
-        assertFalse(req2.matches("baz1", 99, -1));
-
-        TasksRequest req3 = new TasksRequest(null, 200, 900, 200, 900);
-        assertFalse(req3.matches("foo1", -1, -1));
-        assertFalse(req3.matches("bar1", 100, 200));
-        assertFalse(req3.matches("bar1", 200, 1000));
-        assertTrue(req3.matches("bar1", 200, 700));
-        assertFalse(req3.matches("baz1", 101, -1));
+        TasksRequest req1 = new TasksRequest(null, 0, 0, 0, 0, Optional.empty());
+        assertTrue(req1.matches("foo1", -1, -1, TaskStateType.PENDING));
+        assertTrue(req1.matches("bar1", 100, 200, TaskStateType.DONE));
+        assertTrue(req1.matches("baz1", 100, -1, TaskStateType.RUNNING));
+
+        TasksRequest req2 = new TasksRequest(null, 100, 0, 0, 0, Optional.empty());
+        assertFalse(req2.matches("foo1", -1, -1, TaskStateType.PENDING));
+        assertTrue(req2.matches("bar1", 100, 200, TaskStateType.DONE));
+        assertFalse(req2.matches("bar1", 99, 200, TaskStateType.DONE));
+        assertFalse(req2.matches("baz1", 99, -1, TaskStateType.RUNNING));
+
+        TasksRequest req3 = new TasksRequest(null, 200, 900, 200, 900, Optional.empty());
+        assertFalse(req3.matches("foo1", -1, -1, TaskStateType.PENDING));
+        assertFalse(req3.matches("bar1", 100, 200, TaskStateType.DONE));
+        assertFalse(req3.matches("bar1", 200, 1000, TaskStateType.DONE));
+        assertTrue(req3.matches("bar1", 200, 700, TaskStateType.DONE));
+        assertFalse(req3.matches("baz1", 101, -1, TaskStateType.RUNNING));
 
         List<String> taskIds = new ArrayList<>();
         taskIds.add("foo1");
         taskIds.add("bar1");
         taskIds.add("baz1");
-        TasksRequest req4 = new TasksRequest(taskIds, 1000, -1, -1, -1);
-        assertFalse(req4.matches("foo1", -1, -1));
-        assertTrue(req4.matches("foo1", 1000, -1));
-        assertFalse(req4.matches("foo1", 900, -1));
-        assertFalse(req4.matches("baz2", 2000, -1));
-        assertFalse(req4.matches("baz2", -1, -1));
+        TasksRequest req4 = new TasksRequest(taskIds, 1000, -1, -1, -1, Optional.empty());
+        assertFalse(req4.matches("foo1", -1, -1, TaskStateType.PENDING));
+        assertTrue(req4.matches("foo1", 1000, -1, TaskStateType.RUNNING));
+        assertFalse(req4.matches("foo1", 900, -1, TaskStateType.RUNNING));
+        assertFalse(req4.matches("baz2", 2000, -1, TaskStateType.RUNNING));
+        assertFalse(req4.matches("baz2", -1, -1, TaskStateType.PENDING));
+
+        TasksRequest req5 = new TasksRequest(null, 0, 0, 0, 0, Optional.of(TaskStateType.RUNNING));
+        assertTrue(req5.matches("foo1", -1, -1, TaskStateType.RUNNING));
+        assertFalse(req5.matches("bar1", -1, -1, TaskStateType.DONE));
+        assertFalse(req5.matches("baz1", -1, -1, TaskStateType.STOPPING));
+        assertFalse(req5.matches("baz1", -1, -1, TaskStateType.PENDING));
     }
 
     @Test
@@ -463,9 +471,9 @@ public class CoordinatorTest {
                 waitFor(coordinatorClient);
 
             assertEquals(0, coordinatorClient.tasks(
-                new TasksRequest(null, 10, 0, 10, 0)).tasks().size());
+                new TasksRequest(null, 10, 0, 10, 0, Optional.empty())).tasks().size());
             TasksResponse resp1 = coordinatorClient.tasks(
-                new TasksRequest(Arrays.asList(new String[] {"foo", "baz" }), 0, 0, 0, 0));
+                new TasksRequest(Arrays.asList(new String[] {"foo", "baz" }), 0, 0, 0, 0, Optional.empty()));
             assertTrue(resp1.tasks().containsKey("foo"));
             assertFalse(resp1.tasks().containsKey("bar"));
             assertEquals(1, resp1.tasks().size());
@@ -483,13 +491,13 @@ public class CoordinatorTest {
                 waitFor(cluster.agentClient("node02"));
 
             TasksResponse resp2 = coordinatorClient.tasks(
-                new TasksRequest(null, 1, 0, 0, 0));
+                new TasksRequest(null, 1, 0, 0, 0, Optional.empty()));
             assertTrue(resp2.tasks().containsKey("foo"));
             assertFalse(resp2.tasks().containsKey("bar"));
             assertEquals(1, resp2.tasks().size());
 
             assertEquals(0, coordinatorClient.tasks(
-                new TasksRequest(null, 3, 0, 0, 0)).tasks().size());
+                new TasksRequest(null, 3, 0, 0, 0, Optional.empty())).tasks().size());
         }
     }