You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2019/08/15 21:57:13 UTC
[incubator-druid] branch master updated: Keep track of task
location for completed tasks (#8286)
This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new ef7b960 Keep track of task location for completed tasks (#8286)
ef7b960 is described below
commit ef7b9606f2137a7a724e65c52c28375ce0dff427
Author: Jonathan Wei <jo...@users.noreply.github.com>
AuthorDate: Thu Aug 15 16:57:02 2019 -0500
Keep track of task location for completed tasks (#8286)
* Keep track of task location for completed tasks
* Add TaskLifecycleTest location checks
---
.../java/org/apache/druid/indexer/TaskStatus.java | 41 +++++++++++---
.../org/apache/druid/indexer/TaskStatusTest.java | 62 ++++++++++++++++++++++
.../druid/indexing/overlord/RemoteTaskRunner.java | 16 ++++++
.../overlord/SingleTaskBackgroundRunner.java | 6 +++
.../apache/druid/indexing/overlord/TaskQueue.java | 6 ++-
.../apache/druid/indexing/overlord/TaskRunner.java | 6 +++
.../overlord/hrtr/HttpRemoteTaskRunner.java | 11 ++++
.../indexing/overlord/http/OverlordResource.java | 4 +-
.../druid/indexing/overlord/TaskLifecycleTest.java | 29 ++++++++--
9 files changed, 165 insertions(+), 16 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/indexer/TaskStatus.java b/core/src/main/java/org/apache/druid/indexer/TaskStatus.java
index 6b90fef..a0ffe85 100644
--- a/core/src/main/java/org/apache/druid/indexer/TaskStatus.java
+++ b/core/src/main/java/org/apache/druid/indexer/TaskStatus.java
@@ -25,6 +25,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
+
/**
* Represents the status of a task from the perspective of the coordinator. The task may be ongoing
* ({@link #isComplete()} false) or it may be complete ({@link #isComplete()} true).
@@ -37,32 +39,32 @@ public class TaskStatus
public static TaskStatus running(String taskId)
{
- return new TaskStatus(taskId, TaskState.RUNNING, -1, null);
+ return new TaskStatus(taskId, TaskState.RUNNING, -1, null, null);
}
public static TaskStatus success(String taskId)
{
- return new TaskStatus(taskId, TaskState.SUCCESS, -1, null);
+ return new TaskStatus(taskId, TaskState.SUCCESS, -1, null, null);
}
public static TaskStatus success(String taskId, String errorMsg)
{
- return new TaskStatus(taskId, TaskState.SUCCESS, -1, errorMsg);
+ return new TaskStatus(taskId, TaskState.SUCCESS, -1, errorMsg, null);
}
public static TaskStatus failure(String taskId)
{
- return new TaskStatus(taskId, TaskState.FAILED, -1, null);
+ return new TaskStatus(taskId, TaskState.FAILED, -1, null, null);
}
public static TaskStatus failure(String taskId, String errorMsg)
{
- return new TaskStatus(taskId, TaskState.FAILED, -1, errorMsg);
+ return new TaskStatus(taskId, TaskState.FAILED, -1, errorMsg, null);
}
public static TaskStatus fromCode(String taskId, TaskState code)
{
- return new TaskStatus(taskId, code, -1, null);
+ return new TaskStatus(taskId, code, -1, null, null);
}
// The error message can be large, so truncate it to avoid storing large objects in zookeeper/metadata storage.
@@ -80,19 +82,22 @@ public class TaskStatus
private final TaskState status;
private final long duration;
private final String errorMsg;
+ private final TaskLocation location;
@JsonCreator
protected TaskStatus(
@JsonProperty("id") String id,
@JsonProperty("status") TaskState status,
@JsonProperty("duration") long duration,
- @JsonProperty("errorMsg") String errorMsg
+ @JsonProperty("errorMsg") String errorMsg,
+ @Nullable @JsonProperty("location") TaskLocation location
)
{
this.id = id;
this.status = status;
this.duration = duration;
this.errorMsg = truncateErrorMsg(errorMsg);
+ this.location = location == null ? TaskLocation.unknown() : location;
// Check class invariants.
Preconditions.checkNotNull(id, "id");
@@ -123,6 +128,12 @@ public class TaskStatus
return errorMsg;
}
+ @JsonProperty("location")
+ public TaskLocation getLocation()
+ {
+ return location;
+ }
+
/**
* Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable,
* isSuccess, or isFailure will be true at any one time.
@@ -172,7 +183,21 @@ public class TaskStatus
public TaskStatus withDuration(long _duration)
{
- return new TaskStatus(id, status, _duration, errorMsg);
+ return new TaskStatus(id, status, _duration, errorMsg, location);
+ }
+
+ public TaskStatus withLocation(TaskLocation location)
+ {
+ if (location == null) {
+ location = TaskLocation.unknown();
+ }
+ return new TaskStatus(
+ id,
+ status,
+ duration,
+ errorMsg,
+ location
+ );
}
@Override
diff --git a/core/src/test/java/org/apache/druid/indexer/TaskStatusTest.java b/core/src/test/java/org/apache/druid/indexer/TaskStatusTest.java
new file mode 100644
index 0000000..92651bc
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/indexer/TaskStatusTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TaskStatusTest
+{
+ @Test
+ public void testSerde() throws IOException
+ {
+ final ObjectMapper mapper = new ObjectMapper();
+
+ final TaskStatus status = new TaskStatus(
+ "testId",
+ TaskState.RUNNING,
+ 1000L,
+ "an error message",
+ TaskLocation.create("testHost", 1010, -1)
+ );
+
+ final String json = mapper.writeValueAsString(status);
+ Assert.assertEquals(status, mapper.readValue(json, TaskStatus.class));
+
+ final String jsonNoLocation = "{\n"
+ + "\"id\": \"testId\",\n"
+ + "\"status\": \"SUCCESS\",\n"
+ + "\"duration\": 3000,\n"
+ + "\"errorMsg\": \"hello\"\n"
+ + "}";
+
+ final TaskStatus statusNoLocation = new TaskStatus(
+ "testId",
+ TaskState.SUCCESS,
+ 3000L,
+ "hello",
+ null
+ );
+ Assert.assertEquals(statusNoLocation, mapper.readValue(jsonNoLocation, TaskStatus.class));
+ }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index aa9a582..6206b9d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -475,6 +475,22 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
@Override
+ public TaskLocation getTaskLocation(String taskId)
+ {
+ if (pendingTasks.containsKey(taskId)) {
+ return pendingTasks.get(taskId).getLocation();
+ }
+ if (runningTasks.containsKey(taskId)) {
+ return runningTasks.get(taskId).getLocation();
+ }
+ if (completeTasks.containsKey(taskId)) {
+ return completeTasks.get(taskId).getLocation();
+ }
+
+ return TaskLocation.unknown();
+ }
+
+ @Override
public Optional<ScalingStats> getScalingStats()
{
return Optional.fromNullable(provisioningService.getStats());
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
index e814cc3..fab5a4f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
@@ -302,6 +302,12 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke
}
@Override
+ public TaskLocation getTaskLocation(String taskId)
+ {
+ return location;
+ }
+
+ @Override
public Optional<ScalingStats> getScalingStats()
{
return Optional.absent();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 332356b..a9153b8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -30,6 +30,7 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.Counters;
import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
@@ -421,6 +422,8 @@ public class TaskQueue
{
giant.lock();
+ TaskLocation taskLocation = TaskLocation.unknown();
+
try {
Preconditions.checkNotNull(task, "task");
Preconditions.checkNotNull(taskStatus, "status");
@@ -433,6 +436,7 @@ public class TaskQueue
);
// Inform taskRunner that this task can be shut down
try {
+ taskLocation = taskRunner.getTaskLocation(task.getId());
taskRunner.shutdown(task.getId(), reasonFormat, args);
}
catch (Exception e) {
@@ -461,7 +465,7 @@ public class TaskQueue
if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) {
log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit();
} else {
- taskStorage.setStatus(taskStatus);
+ taskStorage.setStatus(taskStatus.withLocation(taskLocation));
log.info("Task done: %s", task);
managementMayBeNecessary.signalAll();
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
index 8159659..e9dab7f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
@@ -23,6 +23,7 @@ import com.google.common.base.Optional;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
@@ -109,6 +110,11 @@ public interface TaskRunner
return null;
}
+ default TaskLocation getTaskLocation(String taskId)
+ {
+ return TaskLocation.unknown();
+ }
+
/**
* Some runners are able to scale up and down their capacity in a dynamic manner. This returns stats on those activities
*
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index e5fa375..e96c606 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -1140,6 +1140,17 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
}
}
+ @Override
+ public TaskLocation getTaskLocation(String taskId)
+ {
+ final HttpRemoteTaskRunnerWorkItem workItem = tasks.get(taskId);
+ if (workItem == null) {
+ return TaskLocation.unknown();
+ } else {
+ return workItem.getLocation();
+ }
+ }
+
public List<String> getBlacklistedWorkers()
{
return blackListedWorkers.values().stream().map(
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index b153958..2dc6839 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -293,7 +293,7 @@ public class OverlordResource
taskInfo.getStatus().getStatusCode(),
RunnerTaskState.WAITING,
taskInfo.getStatus().getDuration(),
- TaskLocation.unknown(),
+ taskInfo.getStatus().getLocation() == null ? TaskLocation.unknown() : taskInfo.getStatus().getLocation(),
taskInfo.getDataSource(),
taskInfo.getStatus().getErrorMsg()
)
@@ -598,7 +598,7 @@ public class OverlordResource
taskInfo.getStatus().getStatusCode(),
RunnerTaskState.NONE,
taskInfo.getStatus().getDuration(),
- TaskLocation.unknown(),
+ taskInfo.getStatus().getLocation() == null ? TaskLocation.unknown() : taskInfo.getStatus().getLocation(),
taskInfo.getDataSource(),
taskInfo.getStatus().getErrorMsg()
);
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 475b8ef..1ae89f7 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -42,6 +42,7 @@ import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.discovery.DataNodeService;
import org.apache.druid.discovery.DruidNodeAnnouncer;
import org.apache.druid.discovery.LookupNodeService;
+import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
@@ -233,7 +234,12 @@ public class TaskLifecycleTest
private TaskConfig taskConfig;
private DataSegmentPusher dataSegmentPusher;
private AppenderatorsManager appenderatorsManager;
-
+ private DruidNode druidNode = new DruidNode("dummy", "dummy", false, 10000, null, true, false);
+ private TaskLocation taskLocation = TaskLocation.create(
+ druidNode.getHost(),
+ druidNode.getPlaintextPort(),
+ druidNode.getTlsPort()
+ );
private int pushedSegments;
private int announcedSinks;
private SegmentHandoffNotifierFactory handoffNotifierFactory;
@@ -644,7 +650,7 @@ public class TaskLifecycleTest
tb,
taskConfig,
emitter,
- new DruidNode("dummy", "dummy", false, 10000, null, true, false),
+ druidNode,
new ServerConfig()
);
}
@@ -731,6 +737,7 @@ public class TaskLifecycleTest
final List<DataSegment> loggedSegments = byIntervalOrdering.sortedCopy(tsqa.getInsertedSegments(indexTask.getId()));
Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode());
+ Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("merged statusCode", TaskState.SUCCESS, mergedStatus.getStatusCode());
Assert.assertEquals("segments logged vs published", loggedSegments, publishedSegments);
Assert.assertEquals("num segments published", 2, mdc.getPublished().size());
@@ -808,6 +815,7 @@ public class TaskLifecycleTest
final TaskStatus status = runTask(indexTask);
Assert.assertEquals("statusCode", TaskState.FAILED, status.getStatusCode());
+ Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("num segments published", 0, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());
}
@@ -875,6 +883,7 @@ public class TaskLifecycleTest
final Task killTask = new KillTask(null, "test_kill_task", Intervals.of("2011-04-01/P4D"), null);
final TaskStatus status = runTask(killTask);
+ Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("merged statusCode", TaskState.SUCCESS, status.getStatusCode());
Assert.assertEquals("num segments published", 0, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 3, mdc.getNuked().size());
@@ -897,6 +906,7 @@ public class TaskLifecycleTest
final TaskStatus status = runTask(rtishTask);
Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode());
+ Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("num segments published", 2, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());
}
@@ -911,6 +921,7 @@ public class TaskLifecycleTest
final TaskStatus status = runTask(noopTask);
Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode());
+ Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("num segments published", 0, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());
}
@@ -925,6 +936,7 @@ public class TaskLifecycleTest
final TaskStatus status = runTask(neverReadyTask);
Assert.assertEquals("statusCode", TaskState.FAILED, status.getStatusCode());
+ Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("num segments published", 0, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());
}
@@ -978,7 +990,7 @@ public class TaskLifecycleTest
};
final TaskStatus status = runTask(task);
-
+ Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode());
Assert.assertEquals("segments published", 1, mdc.getPublished().size());
Assert.assertEquals("segments nuked", 0, mdc.getNuked().size());
@@ -1019,6 +1031,7 @@ public class TaskLifecycleTest
final TaskStatus status = runTask(task);
Assert.assertEquals("statusCode", TaskState.FAILED, status.getStatusCode());
+ Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("segments published", 0, mdc.getPublished().size());
Assert.assertEquals("segments nuked", 0, mdc.getNuked().size());
}
@@ -1058,6 +1071,7 @@ public class TaskLifecycleTest
final TaskStatus status = runTask(task);
Assert.assertEquals("statusCode", TaskState.FAILED, status.getStatusCode());
+ Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("segments published", 0, mdc.getPublished().size());
Assert.assertEquals("segments nuked", 0, mdc.getNuked().size());
}
@@ -1091,7 +1105,9 @@ public class TaskLifecycleTest
Thread.sleep(10);
}
- Assert.assertTrue("Task should be in Success state", tsqa.getStatus(taskId).get().isSuccess());
+ TaskStatus status = tsqa.getStatus(taskId).get();
+ Assert.assertTrue("Task should be in Success state", status.isSuccess());
+ Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals(1, announcedSinks);
Assert.assertEquals(1, pushedSegments);
@@ -1161,7 +1177,9 @@ public class TaskLifecycleTest
Thread.sleep(10);
}
- Assert.assertTrue("Task should be in Failure state", tsqa.getStatus(taskId).get().isFailure());
+ TaskStatus status = tsqa.getStatus(taskId).get();
+ Assert.assertTrue("Task should be in Failure state", status.isFailure());
+ Assert.assertEquals(taskLocation, status.getLocation());
EasyMock.verify(monitorScheduler, queryRunnerFactoryConglomerate);
}
@@ -1235,6 +1253,7 @@ public class TaskLifecycleTest
final List<DataSegment> loggedSegments = byIntervalOrdering.sortedCopy(tsqa.getInsertedSegments(indexTask.getId()));
Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode());
+ Assert.assertEquals(taskLocation, status.getLocation());
Assert.assertEquals("segments logged vs published", loggedSegments, publishedSegments);
Assert.assertEquals("num segments published", 2, mdc.getPublished().size());
Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org