You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by jo...@apache.org on 2019/08/15 21:57:13 UTC

[incubator-druid] branch master updated: Keep track of task location for completed tasks (#8286)

This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new ef7b960  Keep track of task location for completed tasks (#8286)
ef7b960 is described below

commit ef7b9606f2137a7a724e65c52c28375ce0dff427
Author: Jonathan Wei <jo...@users.noreply.github.com>
AuthorDate: Thu Aug 15 16:57:02 2019 -0500

    Keep track of task location for completed tasks (#8286)
    
    * Keep track of task location for completed tasks
    
    * Add TaskLifecycleTest location checks
---
 .../java/org/apache/druid/indexer/TaskStatus.java  | 41 +++++++++++---
 .../org/apache/druid/indexer/TaskStatusTest.java   | 62 ++++++++++++++++++++++
 .../druid/indexing/overlord/RemoteTaskRunner.java  | 16 ++++++
 .../overlord/SingleTaskBackgroundRunner.java       |  6 +++
 .../apache/druid/indexing/overlord/TaskQueue.java  |  6 ++-
 .../apache/druid/indexing/overlord/TaskRunner.java |  6 +++
 .../overlord/hrtr/HttpRemoteTaskRunner.java        | 11 ++++
 .../indexing/overlord/http/OverlordResource.java   |  4 +-
 .../druid/indexing/overlord/TaskLifecycleTest.java | 29 ++++++++--
 9 files changed, 165 insertions(+), 16 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/indexer/TaskStatus.java b/core/src/main/java/org/apache/druid/indexer/TaskStatus.java
index 6b90fef..a0ffe85 100644
--- a/core/src/main/java/org/apache/druid/indexer/TaskStatus.java
+++ b/core/src/main/java/org/apache/druid/indexer/TaskStatus.java
@@ -25,6 +25,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 
+import javax.annotation.Nullable;
+
 /**
  * Represents the status of a task from the perspective of the coordinator. The task may be ongoing
  * ({@link #isComplete()} false) or it may be complete ({@link #isComplete()} true).
@@ -37,32 +39,32 @@ public class TaskStatus
 
   public static TaskStatus running(String taskId)
   {
-    return new TaskStatus(taskId, TaskState.RUNNING, -1, null);
+    return new TaskStatus(taskId, TaskState.RUNNING, -1, null, null);
   }
 
   public static TaskStatus success(String taskId)
   {
-    return new TaskStatus(taskId, TaskState.SUCCESS, -1, null);
+    return new TaskStatus(taskId, TaskState.SUCCESS, -1, null, null);
   }
 
   public static TaskStatus success(String taskId, String errorMsg)
   {
-    return new TaskStatus(taskId, TaskState.SUCCESS, -1, errorMsg);
+    return new TaskStatus(taskId, TaskState.SUCCESS, -1, errorMsg, null);
   }
 
   public static TaskStatus failure(String taskId)
   {
-    return new TaskStatus(taskId, TaskState.FAILED, -1, null);
+    return new TaskStatus(taskId, TaskState.FAILED, -1, null, null);
   }
 
   public static TaskStatus failure(String taskId, String errorMsg)
   {
-    return new TaskStatus(taskId, TaskState.FAILED, -1, errorMsg);
+    return new TaskStatus(taskId, TaskState.FAILED, -1, errorMsg, null);
   }
 
   public static TaskStatus fromCode(String taskId, TaskState code)
   {
-    return new TaskStatus(taskId, code, -1, null);
+    return new TaskStatus(taskId, code, -1, null, null);
   }
 
   // The error message can be large, so truncate it to avoid storing large objects in zookeeper/metadata storage.
@@ -80,19 +82,22 @@ public class TaskStatus
   private final TaskState status;
   private final long duration;
   private final String errorMsg;
+  private final TaskLocation location;
 
   @JsonCreator
   protected TaskStatus(
       @JsonProperty("id") String id,
       @JsonProperty("status") TaskState status,
       @JsonProperty("duration") long duration,
-      @JsonProperty("errorMsg") String errorMsg
+      @JsonProperty("errorMsg") String errorMsg,
+      @Nullable @JsonProperty("location") TaskLocation location
   )
   {
     this.id = id;
     this.status = status;
     this.duration = duration;
     this.errorMsg = truncateErrorMsg(errorMsg);
+    this.location = location == null ? TaskLocation.unknown() : location;
 
     // Check class invariants.
     Preconditions.checkNotNull(id, "id");
@@ -123,6 +128,12 @@ public class TaskStatus
     return errorMsg;
   }
 
+  @JsonProperty("location")
+  public TaskLocation getLocation()
+  {
+    return location;
+  }
+
   /**
    * Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable,
    * isSuccess, or isFailure will be true at any one time.
@@ -172,7 +183,21 @@ public class TaskStatus
 
   public TaskStatus withDuration(long _duration)
   {
-    return new TaskStatus(id, status, _duration, errorMsg);
+    return new TaskStatus(id, status, _duration, errorMsg, location);
+  }
+
+  public TaskStatus withLocation(TaskLocation location)
+  {
+    if (location == null) {
+      location = TaskLocation.unknown();
+    }
+    return new TaskStatus(
+        id,
+        status,
+        duration,
+        errorMsg,
+        location
+    );
   }
 
   @Override
diff --git a/core/src/test/java/org/apache/druid/indexer/TaskStatusTest.java b/core/src/test/java/org/apache/druid/indexer/TaskStatusTest.java
new file mode 100644
index 0000000..92651bc
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/indexer/TaskStatusTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class TaskStatusTest
+{
+  @Test
+  public void testSerde() throws IOException
+  {
+    final ObjectMapper mapper = new ObjectMapper();
+
+    final TaskStatus status = new TaskStatus(
+        "testId",
+        TaskState.RUNNING,
+        1000L,
+        "an error message",
+        TaskLocation.create("testHost", 1010, -1)
+    );
+
+    final String json = mapper.writeValueAsString(status);
+    Assert.assertEquals(status, mapper.readValue(json, TaskStatus.class));
+
+    final String jsonNoLocation = "{\n"
+                                  + "\"id\": \"testId\",\n"
+                                  + "\"status\": \"SUCCESS\",\n"
+                                  + "\"duration\": 3000,\n"
+                                  + "\"errorMsg\": \"hello\"\n"
+                                  + "}";
+
+    final TaskStatus statusNoLocation = new TaskStatus(
+        "testId",
+        TaskState.SUCCESS,
+        3000L,
+        "hello",
+        null
+    );
+    Assert.assertEquals(statusNoLocation, mapper.readValue(jsonNoLocation, TaskStatus.class));
+  }
+}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
index aa9a582..6206b9d 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java
@@ -475,6 +475,22 @@ public class RemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
   }
 
   @Override
+  public TaskLocation getTaskLocation(String taskId)
+  {
+    if (pendingTasks.containsKey(taskId)) {
+      return pendingTasks.get(taskId).getLocation();
+    }
+    if (runningTasks.containsKey(taskId)) {
+      return runningTasks.get(taskId).getLocation();
+    }
+    if (completeTasks.containsKey(taskId)) {
+      return completeTasks.get(taskId).getLocation();
+    }
+
+    return TaskLocation.unknown();
+  }
+
+  @Override
   public Optional<ScalingStats> getScalingStats()
   {
     return Optional.fromNullable(provisioningService.getStats());
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
index e814cc3..fab5a4f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/SingleTaskBackgroundRunner.java
@@ -302,6 +302,12 @@ public class SingleTaskBackgroundRunner implements TaskRunner, QuerySegmentWalke
   }
 
   @Override
+  public TaskLocation getTaskLocation(String taskId)
+  {
+    return location;
+  }
+
+  @Override
   public Optional<ScalingStats> getScalingStats()
   {
     return Optional.absent();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 332356b..a9153b8 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -30,6 +30,7 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.Counters;
 import org.apache.druid.indexing.common.actions.TaskActionClientFactory;
@@ -421,6 +422,8 @@ public class TaskQueue
   {
     giant.lock();
 
+    TaskLocation taskLocation = TaskLocation.unknown();
+
     try {
       Preconditions.checkNotNull(task, "task");
       Preconditions.checkNotNull(taskStatus, "status");
@@ -433,6 +436,7 @@ public class TaskQueue
       );
       // Inform taskRunner that this task can be shut down
       try {
+        taskLocation = taskRunner.getTaskLocation(task.getId());
         taskRunner.shutdown(task.getId(), reasonFormat, args);
       }
       catch (Exception e) {
@@ -461,7 +465,7 @@ public class TaskQueue
           if (!previousStatus.isPresent() || !previousStatus.get().isRunnable()) {
             log.makeAlert("Ignoring notification for already-complete task").addData("task", task.getId()).emit();
           } else {
-            taskStorage.setStatus(taskStatus);
+            taskStorage.setStatus(taskStatus.withLocation(taskLocation));
             log.info("Task done: %s", task);
             managementMayBeNecessary.signalAll();
           }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
index 8159659..e9dab7f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java
@@ -23,6 +23,7 @@ import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.druid.guice.annotations.PublicApi;
 import org.apache.druid.indexer.RunnerTaskState;
+import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.task.Task;
 import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
@@ -109,6 +110,11 @@ public interface TaskRunner
     return null;
   }
 
+  default TaskLocation getTaskLocation(String taskId)
+  {
+    return TaskLocation.unknown();
+  }
+
   /**
    * Some runners are able to scale up and down their capacity in a dynamic manner. This returns stats on those activities
    *
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
index e5fa375..e96c606 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
@@ -1140,6 +1140,17 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer
     }
   }
 
+  @Override
+  public TaskLocation getTaskLocation(String taskId)
+  {
+    final HttpRemoteTaskRunnerWorkItem workItem = tasks.get(taskId);
+    if (workItem == null) {
+      return TaskLocation.unknown();
+    } else {
+      return workItem.getLocation();
+    }
+  }
+
   public List<String> getBlacklistedWorkers()
   {
     return blackListedWorkers.values().stream().map(
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
index b153958..2dc6839 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java
@@ -293,7 +293,7 @@ public class OverlordResource
                 taskInfo.getStatus().getStatusCode(),
                 RunnerTaskState.WAITING,
                 taskInfo.getStatus().getDuration(),
-                TaskLocation.unknown(),
+                taskInfo.getStatus().getLocation() == null ? TaskLocation.unknown() : taskInfo.getStatus().getLocation(),
                 taskInfo.getDataSource(),
                 taskInfo.getStatus().getErrorMsg()
             )
@@ -598,7 +598,7 @@ public class OverlordResource
         taskInfo.getStatus().getStatusCode(),
         RunnerTaskState.NONE,
         taskInfo.getStatus().getDuration(),
-        TaskLocation.unknown(),
+        taskInfo.getStatus().getLocation() == null ? TaskLocation.unknown() : taskInfo.getStatus().getLocation(),
         taskInfo.getDataSource(),
         taskInfo.getStatus().getErrorMsg()
     );
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index 475b8ef..1ae89f7 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -42,6 +42,7 @@ import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.discovery.DataNodeService;
 import org.apache.druid.discovery.DruidNodeAnnouncer;
 import org.apache.druid.discovery.LookupNodeService;
+import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.SegmentLoaderFactory;
@@ -233,7 +234,12 @@ public class TaskLifecycleTest
   private TaskConfig taskConfig;
   private DataSegmentPusher dataSegmentPusher;
   private AppenderatorsManager appenderatorsManager;
-
+  private DruidNode druidNode = new DruidNode("dummy", "dummy", false, 10000, null, true, false);
+  private TaskLocation taskLocation = TaskLocation.create(
+      druidNode.getHost(),
+      druidNode.getPlaintextPort(),
+      druidNode.getTlsPort()
+  );
   private int pushedSegments;
   private int announcedSinks;
   private SegmentHandoffNotifierFactory handoffNotifierFactory;
@@ -644,7 +650,7 @@ public class TaskLifecycleTest
         tb,
         taskConfig,
         emitter,
-        new DruidNode("dummy", "dummy", false, 10000, null, true, false),
+        druidNode,
         new ServerConfig()
     );
   }
@@ -731,6 +737,7 @@ public class TaskLifecycleTest
     final List<DataSegment> loggedSegments = byIntervalOrdering.sortedCopy(tsqa.getInsertedSegments(indexTask.getId()));
 
     Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode());
+    Assert.assertEquals(taskLocation, status.getLocation());
     Assert.assertEquals("merged statusCode", TaskState.SUCCESS, mergedStatus.getStatusCode());
     Assert.assertEquals("segments logged vs published", loggedSegments, publishedSegments);
     Assert.assertEquals("num segments published", 2, mdc.getPublished().size());
@@ -808,6 +815,7 @@ public class TaskLifecycleTest
     final TaskStatus status = runTask(indexTask);
 
     Assert.assertEquals("statusCode", TaskState.FAILED, status.getStatusCode());
+    Assert.assertEquals(taskLocation, status.getLocation());
     Assert.assertEquals("num segments published", 0, mdc.getPublished().size());
     Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());
   }
@@ -875,6 +883,7 @@ public class TaskLifecycleTest
     final Task killTask = new KillTask(null, "test_kill_task", Intervals.of("2011-04-01/P4D"), null);
 
     final TaskStatus status = runTask(killTask);
+    Assert.assertEquals(taskLocation, status.getLocation());
     Assert.assertEquals("merged statusCode", TaskState.SUCCESS, status.getStatusCode());
     Assert.assertEquals("num segments published", 0, mdc.getPublished().size());
     Assert.assertEquals("num segments nuked", 3, mdc.getNuked().size());
@@ -897,6 +906,7 @@ public class TaskLifecycleTest
     final TaskStatus status = runTask(rtishTask);
 
     Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode());
+    Assert.assertEquals(taskLocation, status.getLocation());
     Assert.assertEquals("num segments published", 2, mdc.getPublished().size());
     Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());
   }
@@ -911,6 +921,7 @@ public class TaskLifecycleTest
     final TaskStatus status = runTask(noopTask);
 
     Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode());
+    Assert.assertEquals(taskLocation, status.getLocation());
     Assert.assertEquals("num segments published", 0, mdc.getPublished().size());
     Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());
   }
@@ -925,6 +936,7 @@ public class TaskLifecycleTest
     final TaskStatus status = runTask(neverReadyTask);
 
     Assert.assertEquals("statusCode", TaskState.FAILED, status.getStatusCode());
+    Assert.assertEquals(taskLocation, status.getLocation());
     Assert.assertEquals("num segments published", 0, mdc.getPublished().size());
     Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());
   }
@@ -978,7 +990,7 @@ public class TaskLifecycleTest
     };
 
     final TaskStatus status = runTask(task);
-
+    Assert.assertEquals(taskLocation, status.getLocation());
     Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode());
     Assert.assertEquals("segments published", 1, mdc.getPublished().size());
     Assert.assertEquals("segments nuked", 0, mdc.getNuked().size());
@@ -1019,6 +1031,7 @@ public class TaskLifecycleTest
     final TaskStatus status = runTask(task);
 
     Assert.assertEquals("statusCode", TaskState.FAILED, status.getStatusCode());
+    Assert.assertEquals(taskLocation, status.getLocation());
     Assert.assertEquals("segments published", 0, mdc.getPublished().size());
     Assert.assertEquals("segments nuked", 0, mdc.getNuked().size());
   }
@@ -1058,6 +1071,7 @@ public class TaskLifecycleTest
     final TaskStatus status = runTask(task);
 
     Assert.assertEquals("statusCode", TaskState.FAILED, status.getStatusCode());
+    Assert.assertEquals(taskLocation, status.getLocation());
     Assert.assertEquals("segments published", 0, mdc.getPublished().size());
     Assert.assertEquals("segments nuked", 0, mdc.getNuked().size());
   }
@@ -1091,7 +1105,9 @@ public class TaskLifecycleTest
       Thread.sleep(10);
     }
 
-    Assert.assertTrue("Task should be in Success state", tsqa.getStatus(taskId).get().isSuccess());
+    TaskStatus status = tsqa.getStatus(taskId).get();
+    Assert.assertTrue("Task should be in Success state", status.isSuccess());
+    Assert.assertEquals(taskLocation, status.getLocation());
 
     Assert.assertEquals(1, announcedSinks);
     Assert.assertEquals(1, pushedSegments);
@@ -1161,7 +1177,9 @@ public class TaskLifecycleTest
       Thread.sleep(10);
     }
 
-    Assert.assertTrue("Task should be in Failure state", tsqa.getStatus(taskId).get().isFailure());
+    TaskStatus status = tsqa.getStatus(taskId).get();
+    Assert.assertTrue("Task should be in Failure state", status.isFailure());
+    Assert.assertEquals(taskLocation, status.getLocation());
 
     EasyMock.verify(monitorScheduler, queryRunnerFactoryConglomerate);
   }
@@ -1235,6 +1253,7 @@ public class TaskLifecycleTest
     final List<DataSegment> loggedSegments = byIntervalOrdering.sortedCopy(tsqa.getInsertedSegments(indexTask.getId()));
 
     Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode());
+    Assert.assertEquals(taskLocation, status.getLocation());
     Assert.assertEquals("segments logged vs published", loggedSegments, publishedSegments);
     Assert.assertEquals("num segments published", 2, mdc.getPublished().size());
     Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org