You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fr...@apache.org on 2021/07/05 02:07:56 UTC

[druid] branch master updated: perf: improve concurrency and improve perf for task query in HeapMemoryTaskStorage (#11272)

This is an automated email from the ASF dual-hosted git repository.

frankchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d322069  perf: improve concurrency and improve perf for task query in HeapMemoryTaskStorage (#11272)
d322069 is described below

commit d3220693f3728dd28244d810cb18edc18de9df39
Author: Jason Koch <ja...@bluedevel.com>
AuthorDate: Sun Jul 4 19:07:26 2021 -0700

    perf: improve concurrency and improve perf for task query in HeapMemoryTaskStorage (#11272)
    
    * perf: improve concurrency and reduce algorithmic cost for task querying in HeapMemoryTaskStorage
    
    * fix: address intellij linter concern regarding use of ConcurrentMap interface
    
    * nit: document thread safety of HeapMemoryTaskStorage
    
    * empty to trigger ci
---
 .../indexing/overlord/HeapMemoryTaskStorage.java   | 399 +++++++--------------
 1 file changed, 139 insertions(+), 260 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
index be43cc1..d949faa 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
@@ -26,6 +26,7 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Ordering;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
 import com.google.inject.Inject;
 import org.apache.druid.indexer.TaskInfo;
 import org.apache.druid.indexer.TaskStatus;
@@ -40,23 +41,25 @@ import org.joda.time.DateTime;
 import org.joda.time.Duration;
 
 import javax.annotation.Nullable;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
- * Implements an in-heap TaskStorage facility, with no persistence across restarts. This class is not
- * thread safe.
+ * Implements an in-heap TaskStorage facility, with no persistence across restarts. This class
+ * is thread safe.
  */
 public class HeapMemoryTaskStorage implements TaskStorage
 {
   private final TaskStorageConfig config;
 
-  private final ReentrantLock giant = new ReentrantLock();
-  private final Map<String, TaskStuff> tasks = new HashMap<>();
+  private final ConcurrentHashMap<String, TaskStuff> tasks = new ConcurrentHashMap<>();
+
+  @GuardedBy("itself")
   private final Multimap<String, TaskLock> taskLocks = HashMultimap.create();
+  @GuardedBy("itself")
   private final Multimap<String, TaskAction> taskActions = ArrayListMultimap.create();
 
   private static final Logger log = new Logger(HeapMemoryTaskStorage.class);
@@ -70,82 +73,59 @@ public class HeapMemoryTaskStorage implements TaskStorage
   @Override
   public void insert(Task task, TaskStatus status) throws EntryExistsException
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkNotNull(task, "task");
-      Preconditions.checkNotNull(status, "status");
-      Preconditions.checkArgument(
-          task.getId().equals(status.getId()),
-          "Task/Status ID mismatch[%s/%s]",
-          task.getId(),
-          status.getId()
-      );
-
-      if (tasks.containsKey(task.getId())) {
-        throw new EntryExistsException(task.getId());
-      }
-
-      log.info("Inserting task %s with status: %s", task.getId(), status);
-      tasks.put(task.getId(), new TaskStuff(task, status, DateTimes.nowUtc(), task.getDataSource()));
-    }
-    finally {
-      giant.unlock();
-    }
+    Preconditions.checkNotNull(task, "task");
+    Preconditions.checkNotNull(status, "status");
+    Preconditions.checkArgument(
+        task.getId().equals(status.getId()),
+        "Task/Status ID mismatch[%s/%s]",
+        task.getId(),
+        status.getId()
+    );
+
+    TaskStuff newTaskStuff = new TaskStuff(task, status, DateTimes.nowUtc(), task.getDataSource());
+    TaskStuff alreadyExisted = tasks.putIfAbsent(task.getId(), newTaskStuff);
+    if (alreadyExisted != null) {
+      throw new EntryExistsException(task.getId());
+    }
+
+    log.info("Inserted task %s with status: %s", task.getId(), status);
   }
 
   @Override
   public Optional<Task> getTask(String taskid)
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkNotNull(taskid, "taskid");
-      if (tasks.containsKey(taskid)) {
-        return Optional.of(tasks.get(taskid).getTask());
-      } else {
-        return Optional.absent();
-      }
-    }
-    finally {
-      giant.unlock();
+    Preconditions.checkNotNull(taskid, "taskid");
+    TaskStuff taskStuff = tasks.get(taskid);
+    if (taskStuff != null) {
+      return Optional.of(taskStuff.getTask());
+    } else {
+      return Optional.absent();
     }
   }
 
   @Override
   public void setStatus(TaskStatus status)
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkNotNull(status, "status");
-
-      final String taskid = status.getId();
-      Preconditions.checkState(tasks.containsKey(taskid), "Task ID must already be present: %s", taskid);
-      Preconditions.checkState(tasks.get(taskid).getStatus().isRunnable(), "Task status must be runnable: %s", taskid);
-      log.info("Updating task %s to status: %s", taskid, status);
-      tasks.put(taskid, tasks.get(taskid).withStatus(status));
-    }
-    finally {
-      giant.unlock();
-    }
+    Preconditions.checkNotNull(status, "status");
+    final String taskid = status.getId();
+
+    log.info("Updating task %s to status: %s", taskid, status);
+    TaskStuff updated = tasks.computeIfPresent(taskid, (tid, taskStuff) -> {
+      Preconditions.checkState(taskStuff.getStatus().isRunnable(), "Task must be runnable: %s", taskid);
+      return taskStuff.withStatus(status);
+    });
+    Preconditions.checkNotNull(updated, "Task ID must already be present: %s", taskid);
   }
 
   @Override
   public Optional<TaskStatus> getStatus(String taskid)
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkNotNull(taskid, "taskid");
-      if (tasks.containsKey(taskid)) {
-        return Optional.of(tasks.get(taskid).getStatus());
-      } else {
-        return Optional.absent();
-      }
-    }
-    finally {
-      giant.unlock();
+    Preconditions.checkNotNull(taskid, "taskid");
+    TaskStuff existing = tasks.get(taskid);
+    if (existing != null) {
+      return Optional.of(existing.getStatus());
+    } else {
+      return Optional.absent();
     }
   }
 
@@ -153,90 +133,50 @@ public class HeapMemoryTaskStorage implements TaskStorage
   @Override
   public TaskInfo<Task, TaskStatus> getTaskInfo(String taskId)
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkNotNull(taskId, "taskId");
-      final TaskStuff taskStuff = tasks.get(taskId);
-      if (taskStuff != null) {
-        return new TaskInfo<>(
-            taskStuff.getTask().getId(),
-            taskStuff.getCreatedDate(),
-            taskStuff.getStatus(),
-            taskStuff.getDataSource(),
-            taskStuff.getTask()
-        );
-      } else {
-        return null;
-      }
-    }
-    finally {
-      giant.unlock();
+    Preconditions.checkNotNull(taskId, "taskId");
+    final TaskStuff taskStuff = tasks.get(taskId);
+    if (taskStuff != null) {
+      return TaskStuff.toTaskInfo(taskStuff);
+    } else {
+      return null;
     }
   }
 
   @Override
   public List<Task> getActiveTasks()
   {
-    giant.lock();
-
-    try {
-      final ImmutableList.Builder<Task> listBuilder = ImmutableList.builder();
-      for (final TaskStuff taskStuff : tasks.values()) {
-        if (taskStuff.getStatus().isRunnable()) {
-          listBuilder.add(taskStuff.getTask());
-        }
+    final ImmutableList.Builder<Task> listBuilder = ImmutableList.builder();
+    for (final TaskStuff taskStuff : tasks.values()) {
+      if (taskStuff.getStatus().isRunnable()) {
+        listBuilder.add(taskStuff.getTask());
       }
-      return listBuilder.build();
-    }
-    finally {
-      giant.unlock();
     }
+    return listBuilder.build();
   }
 
   @Override
   public List<Task> getActiveTasksByDatasource(String datasource)
   {
-    giant.lock();
-
-    try {
-      final ImmutableList.Builder<Task> listBuilder = ImmutableList.builder();
-      for (Map.Entry<String, TaskStuff> entry : tasks.entrySet()) {
-        if (entry.getValue().getStatus().isRunnable() && entry.getValue().getDataSource().equals(datasource)) {
-          listBuilder.add(entry.getValue().getTask());
-        }
+    final ImmutableList.Builder<Task> listBuilder = ImmutableList.builder();
+    for (Map.Entry<String, TaskStuff> entry : tasks.entrySet()) {
+      if (entry.getValue().getStatus().isRunnable() && entry.getValue().getDataSource().equals(datasource)) {
+        listBuilder.add(entry.getValue().getTask());
       }
-      return listBuilder.build();
-    }
-    finally {
-      giant.unlock();
     }
+    return listBuilder.build();
   }
 
   @Override
   public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource)
   {
-    giant.lock();
-
-    try {
-      final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
-      for (final TaskStuff taskStuff : tasks.values()) {
-        if (taskStuff.getStatus().isRunnable()) {
-          TaskInfo t = new TaskInfo<>(
-              taskStuff.getTask().getId(),
-              taskStuff.getCreatedDate(),
-              taskStuff.getStatus(),
-              taskStuff.getDataSource(),
-              taskStuff.getTask()
-          );
-          listBuilder.add(t);
-        }
+    final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
+    for (final TaskStuff taskStuff : tasks.values()) {
+      if (taskStuff.getStatus().isRunnable()) {
+        TaskInfo t = TaskStuff.toTaskInfo(taskStuff);
+        listBuilder.add(t);
       }
-      return listBuilder.build();
-    }
-    finally {
-      giant.unlock();
     }
+    return listBuilder.build();
   }
 
   @Override
@@ -246,29 +186,22 @@ public class HeapMemoryTaskStorage implements TaskStorage
       @Nullable String datasource
   )
   {
-    giant.lock();
-
-    try {
-      final Ordering<TaskStuff> createdDateDesc = new Ordering<TaskStuff>()
+    final Ordering<TaskStuff> createdDateDesc = new Ordering<TaskStuff>()
+    {
+      @Override
+      public int compare(TaskStuff a, TaskStuff b)
       {
-        @Override
-        public int compare(TaskStuff a, TaskStuff b)
-        {
-          return a.getCreatedDate().compareTo(b.getCreatedDate());
-        }
-      }.reverse();
-
-      return maxTaskStatuses == null ?
-             getRecentlyCreatedAlreadyFinishedTaskInfoSince(
-                 DateTimes.nowUtc()
-                          .minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow),
-                 createdDateDesc
-             ) :
-             getNRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, createdDateDesc);
-    }
-    finally {
-      giant.unlock();
-    }
+        return a.getCreatedDate().compareTo(b.getCreatedDate());
+      }
+    }.reverse();
+
+    return maxTaskStatuses == null ?
+            getRecentlyCreatedAlreadyFinishedTaskInfoSince(
+                DateTimes.nowUtc()
+                        .minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow),
+                createdDateDesc
+            ) :
+            getNRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, createdDateDesc);
   }
 
   private List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfoSince(
@@ -276,31 +209,13 @@ public class HeapMemoryTaskStorage implements TaskStorage
       Ordering<TaskStuff> createdDateDesc
   )
   {
-    giant.lock();
-
-    try {
-      List<TaskStuff> list = createdDateDesc
-          .sortedCopy(tasks.values())
-          .stream()
-          .filter(taskStuff -> taskStuff.getStatus().isComplete() && taskStuff.createdDate.isAfter(start))
-          .collect(Collectors.toList());
-      final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
-      for (final TaskStuff taskStuff : list) {
-        String id = taskStuff.getTask().getId();
-        TaskInfo t = new TaskInfo<>(
-            id,
-            taskStuff.getCreatedDate(),
-            taskStuff.getStatus(),
-            taskStuff.getDataSource(),
-            taskStuff.getTask()
-        );
-        listBuilder.add(t);
-      }
-      return listBuilder.build();
-    }
-    finally {
-      giant.unlock();
-    }
+    List<TaskInfo<Task, TaskStatus>> list = tasks.values()
+        .stream()
+        .filter(taskStuff -> taskStuff.getStatus().isComplete() && taskStuff.getCreatedDate().isAfter(start))
+        .sorted(createdDateDesc)
+        .map(TaskStuff::toTaskInfo)
+        .collect(Collectors.toList());
+    return Collections.unmodifiableList(list);
   }
 
   private List<TaskInfo<Task, TaskStatus>> getNRecentlyCreatedAlreadyFinishedTaskInfo(
@@ -308,114 +223,77 @@ public class HeapMemoryTaskStorage implements TaskStorage
       Ordering<TaskStuff> createdDateDesc
   )
   {
-    giant.lock();
-
-    try {
-      List<TaskStuff> list = createdDateDesc
-          .sortedCopy(tasks.values())
-          .stream()
-          .filter(taskStuff -> taskStuff.getStatus().isComplete())
-          .limit(n)
-          .collect(Collectors.toList());
-      final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
-      for (final TaskStuff taskStuff : list) {
-        String id = taskStuff.getTask().getId();
-        TaskInfo t = new TaskInfo<>(
-            id,
-            taskStuff.getCreatedDate(),
-            taskStuff.getStatus(),
-            taskStuff.getDataSource(),
-            taskStuff.getTask()
-        );
-        listBuilder.add(t);
-      }
-      return listBuilder.build();
-    }
-    finally {
-      giant.unlock();
-    }
+    List<TaskInfo<Task, TaskStatus>> list = tasks.values()
+        .stream()
+        .filter(taskStuff -> taskStuff.getStatus().isComplete())
+        .sorted(createdDateDesc)
+        .limit(n)
+        .map(TaskStuff::toTaskInfo)
+        .collect(Collectors.toList());
+    return Collections.unmodifiableList(list);
   }
 
   @Override
   public void addLock(final String taskid, final TaskLock taskLock)
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkNotNull(taskid, "taskid");
-      Preconditions.checkNotNull(taskLock, "taskLock");
+    Preconditions.checkNotNull(taskid, "taskid");
+    Preconditions.checkNotNull(taskLock, "taskLock");
+    synchronized (taskLocks) {
       taskLocks.put(taskid, taskLock);
     }
-    finally {
-      giant.unlock();
-    }
   }
 
   @Override
   public void replaceLock(String taskid, TaskLock oldLock, TaskLock newLock)
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkNotNull(taskid, "taskid");
-      Preconditions.checkNotNull(oldLock, "oldLock");
-      Preconditions.checkNotNull(newLock, "newLock");
+    Preconditions.checkNotNull(taskid, "taskid");
+    Preconditions.checkNotNull(oldLock, "oldLock");
+    Preconditions.checkNotNull(newLock, "newLock");
 
+    synchronized (taskLocks) {
       if (!taskLocks.remove(taskid, oldLock)) {
         log.warn("taskLock[%s] for replacement is not found for task[%s]", oldLock, taskid);
       }
 
       taskLocks.put(taskid, newLock);
     }
-    finally {
-      giant.unlock();
-    }
   }
 
   @Override
   public void removeLock(final String taskid, final TaskLock taskLock)
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkNotNull(taskLock, "taskLock");
+    Preconditions.checkNotNull(taskLock, "taskLock");
+    synchronized (taskLocks) {
       taskLocks.remove(taskid, taskLock);
     }
-    finally {
-      giant.unlock();
-    }
   }
 
   @Override
-  public void removeTasksOlderThan(final long timestamp)
+  public List<TaskLock> getLocks(final String taskid)
   {
-    giant.lock();
-
-    try {
-      List<String> taskIds = tasks.entrySet().stream()
-                                  .filter(entry -> entry.getValue().getStatus().isComplete()
-                                                   && entry.getValue().getCreatedDate().isBefore(timestamp))
-                                  .map(entry -> entry.getKey())
-                                  .collect(Collectors.toList());
-
-      taskIds.forEach(taskActions::removeAll);
-      taskIds.forEach(tasks::remove);
-    }
-    finally {
-      giant.unlock();
+    synchronized (taskLocks) {
+      return ImmutableList.copyOf(taskLocks.get(taskid));
     }
   }
 
   @Override
-  public List<TaskLock> getLocks(final String taskid)
+  public void removeTasksOlderThan(final long timestamp)
   {
-    giant.lock();
-
-    try {
-      return ImmutableList.copyOf(taskLocks.get(taskid));
-    }
-    finally {
-      giant.unlock();
+    // This is the only fn where both tasks & taskActions are modified for removal, they may
+    // be added elsewhere.
+
+    // It is possible that multiple calls here occur to removeTasksOlderThan() concurrently.
+    // It is then possible that the same task will be queued for removal twice. Whilst not ideal,
+    // it will not cause any problems.
+    List<String> taskIds = tasks.entrySet().stream()
+        .filter(entry -> entry.getValue().getStatus().isComplete()
+                          && entry.getValue().getCreatedDate().isBefore(timestamp))
+        .map(entry -> entry.getKey())
+        .collect(Collectors.toList());
+
+    taskIds.forEach(tasks::remove);
+    synchronized (taskActions) {
+      taskIds.forEach(taskActions::removeAll);
     }
   }
 
@@ -423,28 +301,18 @@ public class HeapMemoryTaskStorage implements TaskStorage
   @Override
   public <T> void addAuditLog(Task task, TaskAction<T> taskAction)
   {
-    giant.lock();
-
-    try {
+    synchronized (taskActions) {
       taskActions.put(task.getId(), taskAction);
     }
-    finally {
-      giant.unlock();
-    }
   }
 
   @Deprecated
   @Override
   public List<TaskAction> getAuditLogs(String taskid)
   {
-    giant.lock();
-
-    try {
+    synchronized (taskActions) {
       return ImmutableList.copyOf(taskActions.get(taskid));
     }
-    finally {
-      giant.unlock();
-    }
   }
 
   private static class TaskStuff
@@ -488,5 +356,16 @@ public class HeapMemoryTaskStorage implements TaskStorage
     {
       return new TaskStuff(task, _status, createdDate, dataSource);
     }
+
+    static TaskInfo<Task, TaskStatus> toTaskInfo(TaskStuff taskStuff)
+    {
+      return new TaskInfo<>(
+        taskStuff.getTask().getId(),
+        taskStuff.getCreatedDate(),
+        taskStuff.getStatus(),
+        taskStuff.getDataSource(),
+        taskStuff.getTask()
+      );
+    }
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org