You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fr...@apache.org on 2021/07/05 02:07:56 UTC
[druid] branch master updated: perf: improve concurrency and
improve perf for task query in HeapMemoryTaskStorage (#11272)
This is an automated email from the ASF dual-hosted git repository.
frankchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d322069 perf: improve concurrency and improve perf for task query in HeapMemoryTaskStorage (#11272)
d322069 is described below
commit d3220693f3728dd28244d810cb18edc18de9df39
Author: Jason Koch <ja...@bluedevel.com>
AuthorDate: Sun Jul 4 19:07:26 2021 -0700
perf: improve concurrency and improve perf for task query in HeapMemoryTaskStorage (#11272)
* perf: improve concurrency and reduce algorithmic cost for task querying in HeapMemoryTaskStorage
* fix: address intellij linter concern regarding use of ConcurrentMap interface
* nit: document thread safety of HeapMemoryTaskStorage
* empty to trigger ci
---
.../indexing/overlord/HeapMemoryTaskStorage.java | 399 +++++++--------------
1 file changed, 139 insertions(+), 260 deletions(-)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
index be43cc1..d949faa 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java
@@ -26,6 +26,7 @@ import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskStatus;
@@ -40,23 +41,25 @@ import org.joda.time.DateTime;
import org.joda.time.Duration;
import javax.annotation.Nullable;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
- * Implements an in-heap TaskStorage facility, with no persistence across restarts. This class is not
- * thread safe.
+ * Implements an in-heap TaskStorage facility, with no persistence across restarts. This class
+ * is thread safe.
*/
public class HeapMemoryTaskStorage implements TaskStorage
{
private final TaskStorageConfig config;
- private final ReentrantLock giant = new ReentrantLock();
- private final Map<String, TaskStuff> tasks = new HashMap<>();
+ private final ConcurrentHashMap<String, TaskStuff> tasks = new ConcurrentHashMap<>();
+
+ @GuardedBy("itself")
private final Multimap<String, TaskLock> taskLocks = HashMultimap.create();
+ @GuardedBy("itself")
private final Multimap<String, TaskAction> taskActions = ArrayListMultimap.create();
private static final Logger log = new Logger(HeapMemoryTaskStorage.class);
@@ -70,82 +73,59 @@ public class HeapMemoryTaskStorage implements TaskStorage
@Override
public void insert(Task task, TaskStatus status) throws EntryExistsException
{
- giant.lock();
-
- try {
- Preconditions.checkNotNull(task, "task");
- Preconditions.checkNotNull(status, "status");
- Preconditions.checkArgument(
- task.getId().equals(status.getId()),
- "Task/Status ID mismatch[%s/%s]",
- task.getId(),
- status.getId()
- );
-
- if (tasks.containsKey(task.getId())) {
- throw new EntryExistsException(task.getId());
- }
-
- log.info("Inserting task %s with status: %s", task.getId(), status);
- tasks.put(task.getId(), new TaskStuff(task, status, DateTimes.nowUtc(), task.getDataSource()));
- }
- finally {
- giant.unlock();
- }
+ Preconditions.checkNotNull(task, "task");
+ Preconditions.checkNotNull(status, "status");
+ Preconditions.checkArgument(
+ task.getId().equals(status.getId()),
+ "Task/Status ID mismatch[%s/%s]",
+ task.getId(),
+ status.getId()
+ );
+
+ TaskStuff newTaskStuff = new TaskStuff(task, status, DateTimes.nowUtc(), task.getDataSource());
+ TaskStuff alreadyExisted = tasks.putIfAbsent(task.getId(), newTaskStuff);
+ if (alreadyExisted != null) {
+ throw new EntryExistsException(task.getId());
+ }
+
+ log.info("Inserted task %s with status: %s", task.getId(), status);
}
@Override
public Optional<Task> getTask(String taskid)
{
- giant.lock();
-
- try {
- Preconditions.checkNotNull(taskid, "taskid");
- if (tasks.containsKey(taskid)) {
- return Optional.of(tasks.get(taskid).getTask());
- } else {
- return Optional.absent();
- }
- }
- finally {
- giant.unlock();
+ Preconditions.checkNotNull(taskid, "taskid");
+ TaskStuff taskStuff = tasks.get(taskid);
+ if (taskStuff != null) {
+ return Optional.of(taskStuff.getTask());
+ } else {
+ return Optional.absent();
}
}
@Override
public void setStatus(TaskStatus status)
{
- giant.lock();
-
- try {
- Preconditions.checkNotNull(status, "status");
-
- final String taskid = status.getId();
- Preconditions.checkState(tasks.containsKey(taskid), "Task ID must already be present: %s", taskid);
- Preconditions.checkState(tasks.get(taskid).getStatus().isRunnable(), "Task status must be runnable: %s", taskid);
- log.info("Updating task %s to status: %s", taskid, status);
- tasks.put(taskid, tasks.get(taskid).withStatus(status));
- }
- finally {
- giant.unlock();
- }
+ Preconditions.checkNotNull(status, "status");
+ final String taskid = status.getId();
+
+ log.info("Updating task %s to status: %s", taskid, status);
+ TaskStuff updated = tasks.computeIfPresent(taskid, (tid, taskStuff) -> {
+ Preconditions.checkState(taskStuff.getStatus().isRunnable(), "Task must be runnable: %s", taskid);
+ return taskStuff.withStatus(status);
+ });
+ Preconditions.checkNotNull(updated, "Task ID must already be present: %s", taskid);
}
@Override
public Optional<TaskStatus> getStatus(String taskid)
{
- giant.lock();
-
- try {
- Preconditions.checkNotNull(taskid, "taskid");
- if (tasks.containsKey(taskid)) {
- return Optional.of(tasks.get(taskid).getStatus());
- } else {
- return Optional.absent();
- }
- }
- finally {
- giant.unlock();
+ Preconditions.checkNotNull(taskid, "taskid");
+ TaskStuff existing = tasks.get(taskid);
+ if (existing != null) {
+ return Optional.of(existing.getStatus());
+ } else {
+ return Optional.absent();
}
}
@@ -153,90 +133,50 @@ public class HeapMemoryTaskStorage implements TaskStorage
@Override
public TaskInfo<Task, TaskStatus> getTaskInfo(String taskId)
{
- giant.lock();
-
- try {
- Preconditions.checkNotNull(taskId, "taskId");
- final TaskStuff taskStuff = tasks.get(taskId);
- if (taskStuff != null) {
- return new TaskInfo<>(
- taskStuff.getTask().getId(),
- taskStuff.getCreatedDate(),
- taskStuff.getStatus(),
- taskStuff.getDataSource(),
- taskStuff.getTask()
- );
- } else {
- return null;
- }
- }
- finally {
- giant.unlock();
+ Preconditions.checkNotNull(taskId, "taskId");
+ final TaskStuff taskStuff = tasks.get(taskId);
+ if (taskStuff != null) {
+ return TaskStuff.toTaskInfo(taskStuff);
+ } else {
+ return null;
}
}
@Override
public List<Task> getActiveTasks()
{
- giant.lock();
-
- try {
- final ImmutableList.Builder<Task> listBuilder = ImmutableList.builder();
- for (final TaskStuff taskStuff : tasks.values()) {
- if (taskStuff.getStatus().isRunnable()) {
- listBuilder.add(taskStuff.getTask());
- }
+ final ImmutableList.Builder<Task> listBuilder = ImmutableList.builder();
+ for (final TaskStuff taskStuff : tasks.values()) {
+ if (taskStuff.getStatus().isRunnable()) {
+ listBuilder.add(taskStuff.getTask());
}
- return listBuilder.build();
- }
- finally {
- giant.unlock();
}
+ return listBuilder.build();
}
@Override
public List<Task> getActiveTasksByDatasource(String datasource)
{
- giant.lock();
-
- try {
- final ImmutableList.Builder<Task> listBuilder = ImmutableList.builder();
- for (Map.Entry<String, TaskStuff> entry : tasks.entrySet()) {
- if (entry.getValue().getStatus().isRunnable() && entry.getValue().getDataSource().equals(datasource)) {
- listBuilder.add(entry.getValue().getTask());
- }
+ final ImmutableList.Builder<Task> listBuilder = ImmutableList.builder();
+ for (Map.Entry<String, TaskStuff> entry : tasks.entrySet()) {
+ if (entry.getValue().getStatus().isRunnable() && entry.getValue().getDataSource().equals(datasource)) {
+ listBuilder.add(entry.getValue().getTask());
}
- return listBuilder.build();
- }
- finally {
- giant.unlock();
}
+ return listBuilder.build();
}
@Override
public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataSource)
{
- giant.lock();
-
- try {
- final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
- for (final TaskStuff taskStuff : tasks.values()) {
- if (taskStuff.getStatus().isRunnable()) {
- TaskInfo t = new TaskInfo<>(
- taskStuff.getTask().getId(),
- taskStuff.getCreatedDate(),
- taskStuff.getStatus(),
- taskStuff.getDataSource(),
- taskStuff.getTask()
- );
- listBuilder.add(t);
- }
+ final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
+ for (final TaskStuff taskStuff : tasks.values()) {
+ if (taskStuff.getStatus().isRunnable()) {
+ TaskInfo t = TaskStuff.toTaskInfo(taskStuff);
+ listBuilder.add(t);
}
- return listBuilder.build();
- }
- finally {
- giant.unlock();
}
+ return listBuilder.build();
}
@Override
@@ -246,29 +186,22 @@ public class HeapMemoryTaskStorage implements TaskStorage
@Nullable String datasource
)
{
- giant.lock();
-
- try {
- final Ordering<TaskStuff> createdDateDesc = new Ordering<TaskStuff>()
+ final Ordering<TaskStuff> createdDateDesc = new Ordering<TaskStuff>()
+ {
+ @Override
+ public int compare(TaskStuff a, TaskStuff b)
{
- @Override
- public int compare(TaskStuff a, TaskStuff b)
- {
- return a.getCreatedDate().compareTo(b.getCreatedDate());
- }
- }.reverse();
-
- return maxTaskStatuses == null ?
- getRecentlyCreatedAlreadyFinishedTaskInfoSince(
- DateTimes.nowUtc()
- .minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow),
- createdDateDesc
- ) :
- getNRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, createdDateDesc);
- }
- finally {
- giant.unlock();
- }
+ return a.getCreatedDate().compareTo(b.getCreatedDate());
+ }
+ }.reverse();
+
+ return maxTaskStatuses == null ?
+ getRecentlyCreatedAlreadyFinishedTaskInfoSince(
+ DateTimes.nowUtc()
+ .minus(durationBeforeNow == null ? config.getRecentlyFinishedThreshold() : durationBeforeNow),
+ createdDateDesc
+ ) :
+ getNRecentlyCreatedAlreadyFinishedTaskInfo(maxTaskStatuses, createdDateDesc);
}
private List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfoSince(
@@ -276,31 +209,13 @@ public class HeapMemoryTaskStorage implements TaskStorage
Ordering<TaskStuff> createdDateDesc
)
{
- giant.lock();
-
- try {
- List<TaskStuff> list = createdDateDesc
- .sortedCopy(tasks.values())
- .stream()
- .filter(taskStuff -> taskStuff.getStatus().isComplete() && taskStuff.createdDate.isAfter(start))
- .collect(Collectors.toList());
- final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
- for (final TaskStuff taskStuff : list) {
- String id = taskStuff.getTask().getId();
- TaskInfo t = new TaskInfo<>(
- id,
- taskStuff.getCreatedDate(),
- taskStuff.getStatus(),
- taskStuff.getDataSource(),
- taskStuff.getTask()
- );
- listBuilder.add(t);
- }
- return listBuilder.build();
- }
- finally {
- giant.unlock();
- }
+ List<TaskInfo<Task, TaskStatus>> list = tasks.values()
+ .stream()
+ .filter(taskStuff -> taskStuff.getStatus().isComplete() && taskStuff.getCreatedDate().isAfter(start))
+ .sorted(createdDateDesc)
+ .map(TaskStuff::toTaskInfo)
+ .collect(Collectors.toList());
+ return Collections.unmodifiableList(list);
}
private List<TaskInfo<Task, TaskStatus>> getNRecentlyCreatedAlreadyFinishedTaskInfo(
@@ -308,114 +223,77 @@ public class HeapMemoryTaskStorage implements TaskStorage
Ordering<TaskStuff> createdDateDesc
)
{
- giant.lock();
-
- try {
- List<TaskStuff> list = createdDateDesc
- .sortedCopy(tasks.values())
- .stream()
- .filter(taskStuff -> taskStuff.getStatus().isComplete())
- .limit(n)
- .collect(Collectors.toList());
- final ImmutableList.Builder<TaskInfo<Task, TaskStatus>> listBuilder = ImmutableList.builder();
- for (final TaskStuff taskStuff : list) {
- String id = taskStuff.getTask().getId();
- TaskInfo t = new TaskInfo<>(
- id,
- taskStuff.getCreatedDate(),
- taskStuff.getStatus(),
- taskStuff.getDataSource(),
- taskStuff.getTask()
- );
- listBuilder.add(t);
- }
- return listBuilder.build();
- }
- finally {
- giant.unlock();
- }
+ List<TaskInfo<Task, TaskStatus>> list = tasks.values()
+ .stream()
+ .filter(taskStuff -> taskStuff.getStatus().isComplete())
+ .sorted(createdDateDesc)
+ .limit(n)
+ .map(TaskStuff::toTaskInfo)
+ .collect(Collectors.toList());
+ return Collections.unmodifiableList(list);
}
@Override
public void addLock(final String taskid, final TaskLock taskLock)
{
- giant.lock();
-
- try {
- Preconditions.checkNotNull(taskid, "taskid");
- Preconditions.checkNotNull(taskLock, "taskLock");
+ Preconditions.checkNotNull(taskid, "taskid");
+ Preconditions.checkNotNull(taskLock, "taskLock");
+ synchronized (taskLocks) {
taskLocks.put(taskid, taskLock);
}
- finally {
- giant.unlock();
- }
}
@Override
public void replaceLock(String taskid, TaskLock oldLock, TaskLock newLock)
{
- giant.lock();
-
- try {
- Preconditions.checkNotNull(taskid, "taskid");
- Preconditions.checkNotNull(oldLock, "oldLock");
- Preconditions.checkNotNull(newLock, "newLock");
+ Preconditions.checkNotNull(taskid, "taskid");
+ Preconditions.checkNotNull(oldLock, "oldLock");
+ Preconditions.checkNotNull(newLock, "newLock");
+ synchronized (taskLocks) {
if (!taskLocks.remove(taskid, oldLock)) {
log.warn("taskLock[%s] for replacement is not found for task[%s]", oldLock, taskid);
}
taskLocks.put(taskid, newLock);
}
- finally {
- giant.unlock();
- }
}
@Override
public void removeLock(final String taskid, final TaskLock taskLock)
{
- giant.lock();
-
- try {
- Preconditions.checkNotNull(taskLock, "taskLock");
+ Preconditions.checkNotNull(taskLock, "taskLock");
+ synchronized (taskLocks) {
taskLocks.remove(taskid, taskLock);
}
- finally {
- giant.unlock();
- }
}
@Override
- public void removeTasksOlderThan(final long timestamp)
+ public List<TaskLock> getLocks(final String taskid)
{
- giant.lock();
-
- try {
- List<String> taskIds = tasks.entrySet().stream()
- .filter(entry -> entry.getValue().getStatus().isComplete()
- && entry.getValue().getCreatedDate().isBefore(timestamp))
- .map(entry -> entry.getKey())
- .collect(Collectors.toList());
-
- taskIds.forEach(taskActions::removeAll);
- taskIds.forEach(tasks::remove);
- }
- finally {
- giant.unlock();
+ synchronized (taskLocks) {
+ return ImmutableList.copyOf(taskLocks.get(taskid));
}
}
@Override
- public List<TaskLock> getLocks(final String taskid)
+ public void removeTasksOlderThan(final long timestamp)
{
- giant.lock();
-
- try {
- return ImmutableList.copyOf(taskLocks.get(taskid));
- }
- finally {
- giant.unlock();
+ // This is the only fn where both tasks & taskActions are modified for removal, they may
+ // be added elsewhere.
+
+ // It is possible that multiple calls here occur to removeTasksOlderThan() concurrently.
+ // It is then possible that the same task will be queued for removal twice. Whilst not ideal,
+ // it will not cause any problems.
+ List<String> taskIds = tasks.entrySet().stream()
+ .filter(entry -> entry.getValue().getStatus().isComplete()
+ && entry.getValue().getCreatedDate().isBefore(timestamp))
+ .map(entry -> entry.getKey())
+ .collect(Collectors.toList());
+
+ taskIds.forEach(tasks::remove);
+ synchronized (taskActions) {
+ taskIds.forEach(taskActions::removeAll);
}
}
@@ -423,28 +301,18 @@ public class HeapMemoryTaskStorage implements TaskStorage
@Override
public <T> void addAuditLog(Task task, TaskAction<T> taskAction)
{
- giant.lock();
-
- try {
+ synchronized (taskActions) {
taskActions.put(task.getId(), taskAction);
}
- finally {
- giant.unlock();
- }
}
@Deprecated
@Override
public List<TaskAction> getAuditLogs(String taskid)
{
- giant.lock();
-
- try {
+ synchronized (taskActions) {
return ImmutableList.copyOf(taskActions.get(taskid));
}
- finally {
- giant.unlock();
- }
}
private static class TaskStuff
@@ -488,5 +356,16 @@ public class HeapMemoryTaskStorage implements TaskStorage
{
return new TaskStuff(task, _status, createdDate, dataSource);
}
+
+ static TaskInfo<Task, TaskStatus> toTaskInfo(TaskStuff taskStuff)
+ {
+ return new TaskInfo<>(
+ taskStuff.getTask().getId(),
+ taskStuff.getCreatedDate(),
+ taskStuff.getStatus(),
+ taskStuff.getDataSource(),
+ taskStuff.getTask()
+ );
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org