You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2018/11/27 00:48:02 UTC
[kafka] branch trunk updated: Trogdor: Add Task State filter to
/coordinator/tasks endpoint (#5907)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7fadf0a Trogdor: Add Task State filter to /coordinator/tasks endpoint (#5907)
7fadf0a is described below
commit 7fadf0a11ddb2451adc66e1b179b84b050ad3f4f
Author: Stanislav Kozlovski <st...@outlook.com>
AuthorDate: Mon Nov 26 16:07:15 2018 -0800
Trogdor: Add Task State filter to /coordinator/tasks endpoint (#5907)
Reviewers: Colin McCabe <cm...@apache.org>
---
checkstyle/suppressions.xml | 2 +-
.../trogdor/coordinator/CoordinatorClient.java | 4 +-
.../coordinator/CoordinatorRestResource.java | 29 +++++++---
.../kafka/trogdor/coordinator/TaskManager.java | 38 ++++++-------
.../org/apache/kafka/trogdor/rest/TaskState.java | 8 +--
.../apache/kafka/trogdor/rest/TaskStateType.java | 42 +++++++++++++++
.../apache/kafka/trogdor/rest/TasksRequest.java | 23 +++++++-
.../apache/kafka/trogdor/common/ExpectedTasks.java | 3 +-
.../kafka/trogdor/coordinator/CoordinatorTest.java | 62 ++++++++++++----------
9 files changed, 146 insertions(+), 65 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f3ab7ec..75ad799 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -57,7 +57,7 @@
files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java"/>
<suppress checks="NPathComplexity"
- files="(BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster).java"/>
+ files="(BufferPool|Fetcher|MetricName|Node|ConfigDef|RecordBatch|SslFactory|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|TokenInformation|Agent|Values|PluginUtils|MiniTrogdorCluster|TasksRequest).java"/>
<!-- clients tests -->
<suppress checks="ClassDataAbstractionCoupling"
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
index 80937a8..4670d2c 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
@@ -42,6 +42,8 @@ import org.slf4j.LoggerFactory;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.core.UriBuilder;
+import java.util.Optional;
+
import static net.sourceforge.argparse4j.impl.Arguments.store;
import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
@@ -244,7 +246,7 @@ public class CoordinatorClient {
} else if (res.getBoolean("show_tasks")) {
System.out.println("Got coordinator tasks: " +
JsonUtil.toPrettyJsonString(client.tasks(
- new TasksRequest(null, 0, 0, 0, 0))));
+ new TasksRequest(null, 0, 0, 0, 0, Optional.empty()))));
} else if (res.getString("show_task") != null) {
String taskId = res.getString("show_task");
TaskRequest req = new TaskRequest(res.getString("show_task"));
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
index c0e7fc9..91f731e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java
@@ -23,8 +23,9 @@ import org.apache.kafka.trogdor.rest.DestroyTaskRequest;
import org.apache.kafka.trogdor.rest.Empty;
import org.apache.kafka.trogdor.rest.StopTaskRequest;
import org.apache.kafka.trogdor.rest.TaskRequest;
-import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TaskState;
+import org.apache.kafka.trogdor.rest.TaskStateType;
+import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TasksResponse;
import javax.servlet.ServletContext;
@@ -32,15 +33,17 @@ import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
+import javax.ws.rs.NotFoundException;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.NotFoundException;
import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -96,13 +99,25 @@ public class CoordinatorRestResource {
}
@GET
- @Path("/tasks")
- public TasksResponse tasks(@QueryParam("taskId") List<String> taskId,
+ @Path("/tasks/")
+ public Response tasks(@QueryParam("taskId") List<String> taskId,
@DefaultValue("0") @QueryParam("firstStartMs") long firstStartMs,
@DefaultValue("0") @QueryParam("lastStartMs") long lastStartMs,
@DefaultValue("0") @QueryParam("firstEndMs") long firstEndMs,
- @DefaultValue("0") @QueryParam("lastEndMs") long lastEndMs) throws Throwable {
- return coordinator().tasks(new TasksRequest(taskId, firstStartMs, lastStartMs, firstEndMs, lastEndMs));
+ @DefaultValue("0") @QueryParam("lastEndMs") long lastEndMs,
+ @DefaultValue("") @QueryParam("state") String state) throws Throwable {
+ boolean isEmptyState = state.equals("");
+ if (!isEmptyState && !TaskStateType.Constants.VALUES.contains(state)) {
+ return Response.status(400).entity(
+ String.format("State %s is invalid. Must be one of %s",
+ state, TaskStateType.Constants.VALUES)
+ ).build();
+ }
+
+ Optional<TaskStateType> givenState = Optional.ofNullable(isEmptyState ? null : TaskStateType.valueOf(state));
+ TasksResponse resp = coordinator().tasks(new TasksRequest(taskId, firstStartMs, lastStartMs, firstEndMs, lastEndMs, givenState));
+
+ return Response.status(200).entity(resp).build();
}
@GET
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
index 934acd3..18ff9cb 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/TaskManager.java
@@ -32,11 +32,12 @@ import org.apache.kafka.trogdor.common.ThreadUtils;
import org.apache.kafka.trogdor.rest.RequestConflictException;
import org.apache.kafka.trogdor.rest.TaskDone;
import org.apache.kafka.trogdor.rest.TaskPending;
+import org.apache.kafka.trogdor.rest.TaskRequest;
import org.apache.kafka.trogdor.rest.TaskRunning;
import org.apache.kafka.trogdor.rest.TaskState;
+import org.apache.kafka.trogdor.rest.TaskStateType;
import org.apache.kafka.trogdor.rest.TaskStopping;
import org.apache.kafka.trogdor.rest.TasksRequest;
-import org.apache.kafka.trogdor.rest.TaskRequest;
import org.apache.kafka.trogdor.rest.TasksResponse;
import org.apache.kafka.trogdor.rest.WorkerDone;
import org.apache.kafka.trogdor.rest.WorkerReceiving;
@@ -142,13 +143,6 @@ public final class TaskManager {
Utils.join(nodeManagers.keySet(), ", "));
}
- enum ManagedTaskState {
- PENDING,
- RUNNING,
- STOPPING,
- DONE;
- }
-
class ManagedTask {
/**
* The task id.
@@ -168,7 +162,7 @@ public final class TaskManager {
/**
* The task state.
*/
- private ManagedTaskState state;
+ private TaskStateType state;
/**
* The time when the task was started, or -1 if the task has not been started.
@@ -201,7 +195,7 @@ public final class TaskManager {
*/
private String error = "";
- ManagedTask(String id, TaskSpec spec, TaskController controller, ManagedTaskState state) {
+ ManagedTask(String id, TaskSpec spec, TaskController controller, TaskStateType state) {
this.id = id;
this.spec = spec;
this.controller = controller;
@@ -345,13 +339,13 @@ public final class TaskManager {
if (failure != null) {
log.info("Failed to create a new task {} with spec {}: {}",
id, spec, failure);
- task = new ManagedTask(id, spec, null, ManagedTaskState.DONE);
+ task = new ManagedTask(id, spec, null, TaskStateType.DONE);
task.doneMs = time.milliseconds();
task.maybeSetError(failure);
tasks.put(id, task);
return null;
}
- task = new ManagedTask(id, spec, controller, ManagedTaskState.PENDING);
+ task = new ManagedTask(id, spec, controller, TaskStateType.PENDING);
tasks.put(id, task);
long delayMs = task.startDelayMs(time.milliseconds());
task.startFuture = scheduler.schedule(executor, new RunTask(task), delayMs);
@@ -374,7 +368,7 @@ public final class TaskManager {
@Override
public Void call() throws Exception {
task.clearStartFuture();
- if (task.state != ManagedTaskState.PENDING) {
+ if (task.state != TaskStateType.PENDING) {
log.info("Can't start task {}, because it is already in state {}.",
task.id, task.state);
return null;
@@ -385,12 +379,12 @@ public final class TaskManager {
} catch (Exception e) {
log.error("Unable to find nodes for task {}", task.id, e);
task.doneMs = time.milliseconds();
- task.state = ManagedTaskState.DONE;
+ task.state = TaskStateType.DONE;
task.maybeSetError("Unable to find nodes for task: " + e.getMessage());
return null;
}
log.info("Running task {} on node(s): {}", task.id, Utils.join(nodeNames, ", "));
- task.state = ManagedTaskState.RUNNING;
+ task.state = TaskStateType.RUNNING;
task.startedMs = time.milliseconds();
for (String workerName : nodeNames) {
long workerId = nextWorkerId++;
@@ -441,7 +435,7 @@ public final class TaskManager {
task.cancelled = true;
task.clearStartFuture();
task.doneMs = time.milliseconds();
- task.state = ManagedTaskState.DONE;
+ task.state = TaskStateType.DONE;
log.info("Stopped pending task {}.", id);
break;
case RUNNING:
@@ -454,14 +448,14 @@ public final class TaskManager {
log.info("Task {} is now complete with error: {}", id, task.error);
}
task.doneMs = time.milliseconds();
- task.state = ManagedTaskState.DONE;
+ task.state = TaskStateType.DONE;
} else {
for (Map.Entry<String, Long> entry : activeWorkerIds.entrySet()) {
nodeManagers.get(entry.getKey()).stopWorker(entry.getValue());
}
log.info("Cancelling task {} with worker(s) {}",
id, Utils.mkString(activeWorkerIds, "", "", " = ", ", "));
- task.state = ManagedTaskState.STOPPING;
+ task.state = TaskStateType.STOPPING;
}
break;
case STOPPING:
@@ -586,14 +580,14 @@ public final class TaskManager {
TreeMap<String, Long> activeWorkerIds = task.activeWorkerIds();
if (activeWorkerIds.isEmpty()) {
task.doneMs = time.milliseconds();
- task.state = ManagedTaskState.DONE;
+ task.state = TaskStateType.DONE;
log.info("{}: Task {} is now complete on {} with error: {}",
nodeName, task.id, Utils.join(task.workerIds.keySet(), ", "),
task.error.isEmpty() ? "(none)" : task.error);
- } else if ((task.state == ManagedTaskState.RUNNING) && (!task.error.isEmpty())) {
+ } else if ((task.state == TaskStateType.RUNNING) && (!task.error.isEmpty())) {
log.info("{}: task {} stopped with error {}. Stopping worker(s): {}",
nodeName, task.id, task.error, Utils.mkString(activeWorkerIds, "{", "}", ": ", ", "));
- task.state = ManagedTaskState.STOPPING;
+ task.state = TaskStateType.STOPPING;
for (Map.Entry<String, Long> entry : activeWorkerIds.entrySet()) {
nodeManagers.get(entry.getKey()).stopWorker(entry.getValue());
}
@@ -621,7 +615,7 @@ public final class TaskManager {
public TasksResponse call() throws Exception {
TreeMap<String, TaskState> states = new TreeMap<>();
for (ManagedTask task : tasks.values()) {
- if (request.matches(task.id, task.startedMs, task.doneMs)) {
+ if (request.matches(task.id, task.startedMs, task.doneMs, task.state)) {
states.put(task.id, task.taskState());
}
}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
index 0764e14..2428893 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskState.java
@@ -31,10 +31,10 @@ import org.apache.kafka.trogdor.task.TaskSpec;
include = JsonTypeInfo.As.PROPERTY,
property = "state")
@JsonSubTypes({
- @JsonSubTypes.Type(value = TaskPending.class, name = "PENDING"),
- @JsonSubTypes.Type(value = TaskRunning.class, name = "RUNNING"),
- @JsonSubTypes.Type(value = TaskStopping.class, name = "STOPPING"),
- @JsonSubTypes.Type(value = TaskDone.class, name = "DONE")
+ @JsonSubTypes.Type(value = TaskPending.class, name = TaskStateType.Constants.PENDING_VALUE),
+ @JsonSubTypes.Type(value = TaskRunning.class, name = TaskStateType.Constants.RUNNING_VALUE),
+ @JsonSubTypes.Type(value = TaskStopping.class, name = TaskStateType.Constants.STOPPING_VALUE),
+ @JsonSubTypes.Type(value = TaskDone.class, name = TaskStateType.Constants.DONE_VALUE)
})
public abstract class TaskState extends Message {
private final TaskSpec spec;
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStateType.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStateType.java
new file mode 100644
index 0000000..c8ade06
--- /dev/null
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TaskStateType.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.trogdor.rest;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * The types of states a single Task can be in
+ */
+public enum TaskStateType {
+ PENDING(Constants.PENDING_VALUE),
+ RUNNING(Constants.RUNNING_VALUE),
+ STOPPING(Constants.STOPPING_VALUE),
+ DONE(Constants.DONE_VALUE);
+
+ TaskStateType(String stateType) {}
+
+ public static class Constants {
+ static final String PENDING_VALUE = "PENDING";
+ static final String RUNNING_VALUE = "RUNNING";
+ static final String STOPPING_VALUE = "STOPPING";
+ static final String DONE_VALUE = "DONE";
+ public static final List<String> VALUES = Collections.unmodifiableList(
+ Arrays.asList(PENDING_VALUE, RUNNING_VALUE, STOPPING_VALUE, DONE_VALUE));
+ }
+}
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksRequest.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksRequest.java
index 24b438a..150a362 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksRequest.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/TasksRequest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Optional;
import java.util.Set;
/**
@@ -55,18 +56,26 @@ public class TasksRequest extends Message {
*/
private final long lastEndMs;
+ /**
+ * The desired state of the tasks.
+ * An empty string will match all states.
+ */
+ private final Optional<TaskStateType> state;
+
@JsonCreator
public TasksRequest(@JsonProperty("taskIds") Collection<String> taskIds,
@JsonProperty("firstStartMs") long firstStartMs,
@JsonProperty("lastStartMs") long lastStartMs,
@JsonProperty("firstEndMs") long firstEndMs,
- @JsonProperty("lastEndMs") long lastEndMs) {
+ @JsonProperty("lastEndMs") long lastEndMs,
+ @JsonProperty("state") Optional<TaskStateType> state) {
this.taskIds = Collections.unmodifiableSet((taskIds == null) ?
new HashSet<String>() : new HashSet<>(taskIds));
this.firstStartMs = Math.max(0, firstStartMs);
this.lastStartMs = Math.max(0, lastStartMs);
this.firstEndMs = Math.max(0, firstEndMs);
this.lastEndMs = Math.max(0, lastEndMs);
+ this.state = state == null ? Optional.empty() : state;
}
@JsonProperty
@@ -94,6 +103,11 @@ public class TasksRequest extends Message {
return lastEndMs;
}
+ @JsonProperty
+ public Optional<TaskStateType> state() {
+ return state;
+ }
+
/**
* Determine if this TaskRequest should return a particular task.
*
@@ -102,7 +116,7 @@ public class TasksRequest extends Message {
* @param endMs The task end time, or -1 if the task hasn't ended.
* @return True if information about the task should be returned.
*/
- public boolean matches(String taskId, long startMs, long endMs) {
+ public boolean matches(String taskId, long startMs, long endMs, TaskStateType state) {
if ((!taskIds.isEmpty()) && (!taskIds.contains(taskId))) {
return false;
}
@@ -118,6 +132,11 @@ public class TasksRequest extends Message {
if ((lastEndMs > 0) && ((endMs < 0) || (endMs > lastEndMs))) {
return false;
}
+
+ if (this.state.isPresent() && !this.state.get().equals(state)) {
+ return false;
+ }
+
return true;
}
}
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
index b0e30a0..c092c92 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/ExpectedTasks.java
@@ -34,6 +34,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.TreeMap;
public class ExpectedTasks {
@@ -146,7 +147,7 @@ public class ExpectedTasks {
public boolean conditionMet() {
TasksResponse tasks = null;
try {
- tasks = client.tasks(new TasksRequest(null, 0, 0, 0, 0));
+ tasks = client.tasks(new TasksRequest(null, 0, 0, 0, 0, Optional.empty()));
} catch (Exception e) {
log.info("Unable to get coordinator tasks", e);
throw new RuntimeException(e);
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
index f22130e..0207104 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java
@@ -42,6 +42,7 @@ import org.apache.kafka.trogdor.rest.TaskDone;
import org.apache.kafka.trogdor.rest.TaskPending;
import org.apache.kafka.trogdor.rest.TaskRunning;
import org.apache.kafka.trogdor.rest.TaskRequest;
+import org.apache.kafka.trogdor.rest.TaskStateType;
import org.apache.kafka.trogdor.rest.TasksRequest;
import org.apache.kafka.trogdor.rest.TaskState;
import org.apache.kafka.trogdor.rest.TasksResponse;
@@ -60,6 +61,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.Optional;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -407,34 +409,40 @@ public class CoordinatorTest {
@Test
public void testTasksRequestMatches() throws Exception {
- TasksRequest req1 = new TasksRequest(null, 0, 0, 0, 0);
- assertTrue(req1.matches("foo1", -1, -1));
- assertTrue(req1.matches("bar1", 100, 200));
- assertTrue(req1.matches("baz1", 100, -1));
-
- TasksRequest req2 = new TasksRequest(null, 100, 0, 0, 0);
- assertFalse(req2.matches("foo1", -1, -1));
- assertTrue(req2.matches("bar1", 100, 200));
- assertFalse(req2.matches("bar1", 99, 200));
- assertFalse(req2.matches("baz1", 99, -1));
-
- TasksRequest req3 = new TasksRequest(null, 200, 900, 200, 900);
- assertFalse(req3.matches("foo1", -1, -1));
- assertFalse(req3.matches("bar1", 100, 200));
- assertFalse(req3.matches("bar1", 200, 1000));
- assertTrue(req3.matches("bar1", 200, 700));
- assertFalse(req3.matches("baz1", 101, -1));
+ TasksRequest req1 = new TasksRequest(null, 0, 0, 0, 0, Optional.empty());
+ assertTrue(req1.matches("foo1", -1, -1, TaskStateType.PENDING));
+ assertTrue(req1.matches("bar1", 100, 200, TaskStateType.DONE));
+ assertTrue(req1.matches("baz1", 100, -1, TaskStateType.RUNNING));
+
+ TasksRequest req2 = new TasksRequest(null, 100, 0, 0, 0, Optional.empty());
+ assertFalse(req2.matches("foo1", -1, -1, TaskStateType.PENDING));
+ assertTrue(req2.matches("bar1", 100, 200, TaskStateType.DONE));
+ assertFalse(req2.matches("bar1", 99, 200, TaskStateType.DONE));
+ assertFalse(req2.matches("baz1", 99, -1, TaskStateType.RUNNING));
+
+ TasksRequest req3 = new TasksRequest(null, 200, 900, 200, 900, Optional.empty());
+ assertFalse(req3.matches("foo1", -1, -1, TaskStateType.PENDING));
+ assertFalse(req3.matches("bar1", 100, 200, TaskStateType.DONE));
+ assertFalse(req3.matches("bar1", 200, 1000, TaskStateType.DONE));
+ assertTrue(req3.matches("bar1", 200, 700, TaskStateType.DONE));
+ assertFalse(req3.matches("baz1", 101, -1, TaskStateType.RUNNING));
List<String> taskIds = new ArrayList<>();
taskIds.add("foo1");
taskIds.add("bar1");
taskIds.add("baz1");
- TasksRequest req4 = new TasksRequest(taskIds, 1000, -1, -1, -1);
- assertFalse(req4.matches("foo1", -1, -1));
- assertTrue(req4.matches("foo1", 1000, -1));
- assertFalse(req4.matches("foo1", 900, -1));
- assertFalse(req4.matches("baz2", 2000, -1));
- assertFalse(req4.matches("baz2", -1, -1));
+ TasksRequest req4 = new TasksRequest(taskIds, 1000, -1, -1, -1, Optional.empty());
+ assertFalse(req4.matches("foo1", -1, -1, TaskStateType.PENDING));
+ assertTrue(req4.matches("foo1", 1000, -1, TaskStateType.RUNNING));
+ assertFalse(req4.matches("foo1", 900, -1, TaskStateType.RUNNING));
+ assertFalse(req4.matches("baz2", 2000, -1, TaskStateType.RUNNING));
+ assertFalse(req4.matches("baz2", -1, -1, TaskStateType.PENDING));
+
+ TasksRequest req5 = new TasksRequest(null, 0, 0, 0, 0, Optional.of(TaskStateType.RUNNING));
+ assertTrue(req5.matches("foo1", -1, -1, TaskStateType.RUNNING));
+ assertFalse(req5.matches("bar1", -1, -1, TaskStateType.DONE));
+ assertFalse(req5.matches("baz1", -1, -1, TaskStateType.STOPPING));
+ assertFalse(req5.matches("baz1", -1, -1, TaskStateType.PENDING));
}
@Test
@@ -463,9 +471,9 @@ public class CoordinatorTest {
waitFor(coordinatorClient);
assertEquals(0, coordinatorClient.tasks(
- new TasksRequest(null, 10, 0, 10, 0)).tasks().size());
+ new TasksRequest(null, 10, 0, 10, 0, Optional.empty())).tasks().size());
TasksResponse resp1 = coordinatorClient.tasks(
- new TasksRequest(Arrays.asList(new String[] {"foo", "baz" }), 0, 0, 0, 0));
+ new TasksRequest(Arrays.asList(new String[] {"foo", "baz" }), 0, 0, 0, 0, Optional.empty()));
assertTrue(resp1.tasks().containsKey("foo"));
assertFalse(resp1.tasks().containsKey("bar"));
assertEquals(1, resp1.tasks().size());
@@ -483,13 +491,13 @@ public class CoordinatorTest {
waitFor(cluster.agentClient("node02"));
TasksResponse resp2 = coordinatorClient.tasks(
- new TasksRequest(null, 1, 0, 0, 0));
+ new TasksRequest(null, 1, 0, 0, 0, Optional.empty()));
assertTrue(resp2.tasks().containsKey("foo"));
assertFalse(resp2.tasks().containsKey("bar"));
assertEquals(1, resp2.tasks().size());
assertEquals(0, coordinatorClient.tasks(
- new TasksRequest(null, 3, 0, 0, 0)).tasks().size());
+ new TasksRequest(null, 3, 0, 0, 0, Optional.empty())).tasks().size());
}
}